diff --git a/IoTBroker-runner/iotbroker.conf.default b/IoTBroker-runner/iotbroker.conf.default index b73d38ff..84c6f698 100755 --- a/IoTBroker-runner/iotbroker.conf.default +++ b/IoTBroker-runner/iotbroker.conf.default @@ -75,6 +75,9 @@ iotbroker_defaultthrottling=1000 # Default subscription duration time iotbroker_defaultduration=31536000 +# System parameters +iotbroker_updatethreadpoolsize=20 + # Ignore IoT Discovery failure (to be enabled if the IoT Discovery is not started) iotbroker_ignoreiotdiscoveryfailure=true @@ -92,9 +95,12 @@ iotbroker_knowledgebaseport=8015 ### Embedded agent configuration # CouchDB configurations +iotbroker_embeddedagent_registrydbname="embeddedagentregistrydb" iotbroker_embeddedagent_couchdbname="historicalrepository" iotbroker_embeddedagent_couchdbcreatedb=true iotbroker_embeddedagent_couchdbprotocol="http" iotbroker_embeddedagent_couchdbhost="127.0.0.1" iotbroker_embeddedagent_couchdbport=5984 iotbroker_embeddedagent_historicallyTrackQueryResponseAndNotifications=false +iotbroker_embeddedagent_storeOnlyLatestValue=false +iotbroker_embeddedagent_localagentid="embeddedagent1" diff --git a/IoTBroker-runner/setup.sh b/IoTBroker-runner/setup.sh index ab34626f..7f80fea7 100755 --- a/IoTBroker-runner/setup.sh +++ b/IoTBroker-runner/setup.sh @@ -154,12 +154,17 @@ setPropertyIntoXML "timestampContextElement" "$iotbroker_timestampcontextelement setPropertyIntoXML "trackContextSource" "$iotbroker_trackcontextsource" "$iotbroker_configxml" setPropertyIntoXML "exposedAddress" "$iotbroker_exposedAddress" "$iotbroker_configxml" setPropertyIntoXML "historicallyTrackQueryResponseAndNotifications" "$iotbroker_embeddedagent_historicallyTrackQueryResponseAndNotifications" "$iotbroker_configxml" +setPropertyIntoXML "updateThreadPoolSize" "$iotbroker_updatethreadpoolsize" "$iotbroker_configxml" +setPropertyIntoXML "historicallyTrackQueryResponseAndNotifications" "$iotbroker_embeddedagent_historicallyTrackQueryResponseAndNotifications" "$iotbroker_configxml" setPropertyIntoXML "couchdb_name" "$iotbroker_embeddedagent_couchdbname" "$iotbroker_embeddedagent_couchdbxml" setPropertyIntoXML "couchdb_createdb" "$iotbroker_embeddedagent_couchdbcreatedb" "$iotbroker_embeddedagent_couchdbxml" setPropertyIntoXML "couchdb_protocol" "$iotbroker_embeddedagent_couchdbprotocol" "$iotbroker_embeddedagent_couchdbxml" setPropertyIntoXML "couchdb_host" "$iotbroker_embeddedagent_couchdbhost" "$iotbroker_embeddedagent_couchdbxml" setPropertyIntoXML "couchdb_port" "$iotbroker_embeddedagent_couchdbport" "$iotbroker_embeddedagent_couchdbxml" +setPropertyIntoXML "storeOnlyLatestValue" "$iotbroker_embeddedagent_storeOnlyLatestValue" "$iotbroker_embeddedagent_couchdbxml" +setPropertyIntoXML "registrydb_name" "$iotbroker_embeddedagent_registrydbname" "$iotbroker_embeddedagent_couchdbxml" +setPropertyIntoXML "embeddedAgentId" "$iotbroker_embeddedagent_localagentid" "$iotbroker_embeddedagent_couchdbxml" setPropertyIntoProperties "log4j.appender.ReportFileAppender.File" "$iotbroker_logfile" "$iotbroker_loggerproperties" setFirstPropertyValueOverMultipleValuesIntoProperties "log4j.rootLogger" "$iotbroker_loglevel" "$iotbroker_loggerproperties" diff --git a/docker/README.md b/docker/README.md index be748b24..7e18ac1c 100644 --- a/docker/README.md +++ b/docker/README.md @@ -25,4 +25,78 @@ docker run -t -p 8065:8065 -p 8060:8060 fiware/iotbroker:v5.4.3-standalone -p ifconfig +docker0 Link encap:Ethernet HWaddr 02:42:0e:f6:a6:67 + inet addr:172.17.0.1 Bcast:0.0.0.0 Mask:255.255.255.0 + inet6 addr: fe80::42:eff:fef6:a667/64 Scope:Link + UP BROADCAST MULTICAST MTU:1500 Metric:1 + RX packets:157061 errors:0 dropped:0 overruns:0 frame:0 + TX packets:202369 errors:0 dropped:0 overruns:0 carrier:0 + collisions:0 txqueuelen:0 + RX bytes:10283839 (10.2 MB) TX bytes:582227340 (582.2 MB) + +``` +Then we need to instruct postgreSQL to accept requests from this virtual network: +``` +sudo echo -e 'host\tall\tall\t172.17.0.0/24\ttrust' >> /etc/postgresql/9.5/main/pg_hba.conf +sudo sed -i "s/^port/listen_addresses = '\*'\nport/g" /etc/postgresql/9.5/main/postgresql.conf +sudo service postgresql restart +``` + +Configure **CouchDB**: + +Similarly we need to allow docker to access couchdb: + +``` +sudo sed -i "s/;bind_address = 127.0.0.1/bind_address = 0\.0\.0\.0/g" "/etc/couchdb/local.ini" +sudo service couchdb restart +``` + +Configure **HSQLDB**: + +First it is necessarry to copy the HSQLDB directory (https://github.com/Aeronbroker/Aeron/tree/master/SQL_database) that contains the database files somewhere in the host file system. We assume that it is located under /home/user/SQL_database. +At this point we need to mount the HSQLDB folder into the docker container and instruct the IoT Broker in the container to access the mounted folder (with the -p option): +``` +docker run -t -v /home/user/SQL_database/:/SQL_database nle/iotbroker:testing20170713 -p iotbroker_hsqldbdirectory="//SQL_database//database//linkDB" +``` + +Running docker with all permanent storages: + +``` +sudo docker run -t -p 8065:8065 -p 8060:8060 \ +-v /tmp/testhsqldbindocker:/testhsqldbindocker \ +nle/iotbroker:testing20170713 \ +-p iotbroker_historicalagent="enabled" \ +-p iotbroker_embeddedagent_registrydbname="embeddedagentregistry" \ +-p iotbroker_embeddedagent_couchdbname="embeddedagenthistorical" \ +-p iotbroker_hsqldbdirectory="//SQL_database//database//linkDB" \ +-p iotbroker_embeddedagent_couchdbhost="172.17.0.1" \ +-p confman_couchdbipandport="http://172.17.0.1:5984" \ +-p confman_couchdbregistercontextdbname="iotdiscoveryregistrations" \ +-p confman_couchdbsubscriptiondbname="iotdiscoverysubscriptions" \ +-p confman_postgresurl='//172.17.0.1/' \ +-p confman_postgresdbname="iotdiscoverypostgres" +``` diff --git a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/FullHttpRequester.java b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/FullHttpRequester.java index a5e59c34..961a5f86 100644 --- a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/FullHttpRequester.java +++ b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/FullHttpRequester.java @@ -105,7 +105,7 @@ public static FullHttpResponse sendPost(URL url, String data, } if (logger.isDebugEnabled()) { - logger.info("\nSending 'POST' request to URL : " + url + "\n" + logger.debug("\nSending 'POST' request to URL : " + url + "\n" + "POST parameters : " + data + "\n"); } @@ -422,10 +422,11 @@ public static FullHttpResponse sendPut(URL url, String data, httpResponse.setBody(response.toString()); - // logger.info("Response Code : " + responseCode); - logger.info("\nPUT to URL : " + url + "\nResponse Code : " - + responseCode + "\n" + "Response : " - + response.toString()); + if (logger.isDebugEnabled()){ + logger.debug("\nPUT to URL : " + url + "\nResponse Code : " + + responseCode + "\n" + "Response : " + + response.toString()); + } con.disconnect(); return httpResponse; @@ -460,11 +461,12 @@ public static FullHttpResponse sendDelete(URL url, String xAuthToken) con.setRequestProperty("X-Auth-Token", xAuthToken); } - logger.info("\nSending 'DELETE' request to URL : " + url); int responseCode = con.getResponseCode(); - logger.info("\nDELETE to URL : " + url + "\nResponse Code : " - + responseCode + "\n"); + if (logger.isDebugEnabled()){ + logger.debug("\nDELETE to URL : " + url + "\nResponse Code : " + + responseCode + "\n"); + } httpResponse = new FullHttpResponse(HttpVersion.HTTP_1_0, con.getResponseCode(), con.getResponseMessage()); diff --git a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/EmbeddedAgentStorageInterface.java b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/EmbeddedAgentStorageInterface.java index 426aa331..7cda18c1 100644 --- a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/EmbeddedAgentStorageInterface.java +++ b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/EmbeddedAgentStorageInterface.java @@ -47,6 +47,7 @@ import java.net.URI; import java.util.Date; import java.util.List; +import java.util.Map; import eu.neclab.iotplatform.ngsi.api.datamodel.ContextElement; import eu.neclab.iotplatform.ngsi.api.datamodel.EntityId; @@ -60,13 +61,15 @@ public interface EmbeddedAgentStorageInterface { // @Override public void setNgsi10Callback(Ngsi10Interface ngsi10Callback); - public void storeLatestData(ContextElement isolatedContextElement); + public Map storeData(List isolatedLatestContextElement, + List isolatedHistoricalContextElement, + Date defaultDate); + + public boolean storeLatestData(ContextElement isolatedContextElement); public void storeHistoricalData(ContextElement isolatedContextElement, Date defaultDate); - // @Override - public List getAllLatestValues(); public ContextElement getLatestValue(String id, URI type, diff --git a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/IoTAgentInterface.java b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/IoTAgentInterface.java index e1a802ee..9016c617 100644 --- a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/IoTAgentInterface.java +++ b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/IoTAgentInterface.java @@ -52,7 +52,7 @@ import eu.neclab.iotplatform.ngsi.api.datamodel.ContextRegistration; import eu.neclab.iotplatform.ngsi.api.datamodel.DiscoverContextAvailabilityResponse; import eu.neclab.iotplatform.ngsi.api.datamodel.EntityId; -import eu.neclab.iotplatform.ngsi.api.datamodel.QueryContextRequest; +import eu.neclab.iotplatform.ngsi.api.datamodel.StatusCode; import eu.neclab.iotplatform.ngsi.api.datamodel.SubscribeContextRequest; import eu.neclab.iotplatform.ngsi.api.ngsi10.Ngsi10Interface; @@ -62,7 +62,7 @@ */ public interface IoTAgentInterface { - void storeData(List contextElementList); + StatusCode storeData(List contextElementList); ContextElement getLatestValue(String id, URI type, String attributeName); diff --git a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/KeyValueStoreInterface.java b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/KeyValueStoreInterface.java index 46c38493..0039098e 100644 --- a/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/KeyValueStoreInterface.java +++ b/eu.neclab.iotplatform.iotbroker.commons/src/main/java/eu/neclab/iotplatform/iotbroker/commons/interfaces/KeyValueStoreInterface.java @@ -46,6 +46,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import com.google.common.collect.Multimap; @@ -57,6 +58,14 @@ public interface KeyValueStoreInterface { boolean storeValue(String key, ContextElement contextElement); + boolean storeValue(String key, ContextElement contextElement, + boolean cacheAfterStore); + + public Map storeAndUpdateValues( + Map keyValuesToStore, + Map keyValuesToUpdate, + boolean cacheAfterStoring); + public Collection getKeys(String startKey, String endKey); public Multimap getIdsByType(); @@ -64,8 +73,8 @@ public interface KeyValueStoreInterface { ContextElement getValue(String latestValueDocumentKey); ContextElement getValues(String startKey, String endKey); - - List getValues(List keys); + + Collection getValues(List keys); List getAllValues(String startKey, String endKey); diff --git a/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/IotBrokerCore.java b/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/IotBrokerCore.java index 04fc3816..ca8bb7d7 100644 --- a/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/IotBrokerCore.java +++ b/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/IotBrokerCore.java @@ -50,8 +50,10 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -94,6 +96,7 @@ import eu.neclab.iotplatform.iotbroker.storage.LinkSubscriptionAvailabilityInterface; import eu.neclab.iotplatform.iotbroker.storage.SubscriptionStorageInterface; import eu.neclab.iotplatform.ngsi.api.datamodel.Code; +import eu.neclab.iotplatform.ngsi.api.datamodel.ContextAttribute; import eu.neclab.iotplatform.ngsi.api.datamodel.ContextElement; import eu.neclab.iotplatform.ngsi.api.datamodel.ContextElementResponse; import eu.neclab.iotplatform.ngsi.api.datamodel.ContextMetadata; @@ -259,7 +262,9 @@ public void setQueryServiceList(List queryServiceList) { */ private OnTimeIntervalHandlerInterface onTimeIntervalHandler; - private ExecutorService executor = Executors.newFixedThreadPool(20); + @Value("${updateThreadPoolSize:20}") + private int updateThreadPoolSize; + private ExecutorService executor; /** * Interface for hierarchies of IoT Broker instances; extra bundle not @@ -539,6 +544,16 @@ public void enableAssociations(boolean associationsEnabled) { this.associationsEnabled = associationsEnabled; } + public int getUpdateThreadPoolSize() { + return updateThreadPoolSize; + } + + public void setUpdateThreadPoolSize(int updateThreadPoolSize) { + this.updateThreadPoolSize = updateThreadPoolSize; + this.executor = Executors.newFixedThreadPool(updateThreadPoolSize); + logger.info("execturo of size: " + updateThreadPoolSize); + } + @PostConstruct public void postConstruct() { if ("null".equals(pubSubUrl_ngsiv1_orion)) { @@ -555,6 +570,8 @@ public void postConstruct() { pubSubUrlList_ngsiv1 = getListOfUpdateAdress(pubSubUrl_ngsiv1); } + this.executor = Executors.newFixedThreadPool(updateThreadPoolSize); + } /** @@ -671,7 +688,9 @@ public QueryContextResponse queryContext(QueryContextRequest request) { */ if ((discoveryResponse.getErrorCode() == null || discoveryResponse .getErrorCode().getCode() == 200) - && discoveryResponse.getContextRegistrationResponse() != null) { + && discoveryResponse.getContextRegistrationResponse() != null + && !discoveryResponse.getContextRegistrationResponse() + .isEmpty()) { List contextRegistrationToQuery = new ArrayList(); contextRegistrationToQuery.addAll(discoveryResponse @@ -1049,16 +1068,57 @@ private static void applyRestriction(String attributeExpr, while (i.hasNext()) { ContextElementResponse contextElresp = i.next(); - Document doc = XmlFactory.stringToDocument(contextElresp - .toString()); + /* + * First check if it is complient with the XPath the entityId + * itself + */ + ContextElement entityAndDomainMetadataOnly = new ContextElement( + contextElresp.getContextElement().getEntityId(), + contextElresp.getContextElement() + .getAttributeDomainName(), null, contextElresp + .getContextElement().getDomainMetadata()); + + Document doc = XmlFactory + .stringToDocument(entityAndDomainMetadataOnly + .toString()); Object result = expr.evaluate(doc, XPathConstants.NODESET); NodeList nodes = (NodeList) result; - if (nodes.getLength() == 0) { - logger.debug("Filtering out : " + contextElresp); + if (nodes.getLength() > 0) { + // it matches then take all the contextAttributes + + continue; + + } + + Iterator j = contextElresp + .getContextElement().getContextAttributeList() + .iterator(); + while (j.hasNext()) { + ContextAttribute contextAttribute = j.next(); + doc = XmlFactory.stringToDocument(contextAttribute + .toString()); + result = expr.evaluate(doc, XPathConstants.NODESET); + + nodes = (NodeList) result; + if (nodes.getLength() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("Filtering out : " + contextAttribute); + } + j.remove(); + } + } + + if (contextElresp.getContextElement().getContextAttributeList() + .isEmpty()) { + // no ContextAttribute, then remove + if (logger.isDebugEnabled()) { + logger.debug("Filtering out : " + contextElresp); + } i.remove(); } + } } catch (XPathExpressionException e) { logger.error("Xpath Exception", e); @@ -1345,7 +1405,7 @@ public UnsubscribeContextResponse unsubscribeContext( public UpdateContextResponse updateContext( final UpdateContextRequest request) { - UpdateContextResponse response = null; + UpdateContextResponse response = new UpdateContextResponse(); /* * Here we apply associations @@ -1358,26 +1418,40 @@ public UpdateContextResponse updateContext( updateContextRequest = request; } + Future historicalAgentFuture = null; + /* * Dump data in Historical Agent if present. */ if (request.getUpdateAction() != UpdateActionType.DELETE && BundleUtils.isServiceRegistered(this, embeddedIoTAgent)) { - executor.execute(new Thread() { + historicalAgentFuture = executor.submit(new Callable() { @Override - public void run() { + public StatusCode call() { + StatusCode statusCode = null; try { - embeddedIoTAgent.storeData(updateContextRequest - .getContextElement()); + + statusCode = embeddedIoTAgent + .storeData(updateContextRequest + .getContextElement()); } catch (org.springframework.osgi.service.ServiceUnavailableException e) { logger.warn("Not possible to store in the Big Data Repository: osgi service not registered"); + statusCode = new StatusCode(500, + ReasonPhrase.RECEIVERINTERNALERROR_500 + .toString(), + "Big Data Repository not present"); } + if (statusCode == null) { + statusCode = new StatusCode(500, + ReasonPhrase.RECEIVERINTERNALERROR_500 + .toString(), "Unknown reason"); + } + return statusCode; } }); - } /* @@ -1390,7 +1464,14 @@ public void run() { if (!BundleUtils.isServiceRegistered(this, embeddedIoTAgent) || !embeddedIoTAgent.isSubscriptionEnabled()) { - notifySubscribers(updateContextRequest); + executor.submit(new Thread() { + + @Override + public void run() { + notifySubscribers(updateContextRequest); + + } + }); } else { logger.info("EmbeddedAgent has its own Subscription system therefore SmartUpdateHandler will not be applied"); @@ -1417,86 +1498,207 @@ public void run() { } /* - * Here we forward notifications to pub sub url + * Here we forward notifications to pub/sub url or data sinks */ - response = fowardUpdateToIoTConsumers(request); - if (response == null) { - response = new UpdateContextResponse(new StatusCode( - Code.INTERNALERROR_500.getCode(), - ReasonPhrase.RECEIVERINTERNALERROR_500.toString(), - "Unspecified error during UpdateContext"), null); - } else if ((response.getContextElementResponse() == null || response - .getContextElementResponse().isEmpty()) - && (response.getErrorCode() == null || (response.getErrorCode() - .getCode() != Code.OK_200.getCode() && ignorePubSubFailure))) { - /* - * Here if the ContextElementResponse is empty AND the ErrorCode is - * null or there was an error - */ + // if (ignorePubSubFailure) { + // + // executor.submit(new Thread() { + // + // @Override + // public void run() { + // fowardUpdateToIoTConsumers(request); + // } + // }); + // + // response = new UpdateContextResponse(new StatusCode( + // Code.OK_200.getCode(), ReasonPhrase.OK_200.toString(), ""), + // null); + // + // } else { + // response = fowardUpdateToIoTConsumers(request); + // + // if (response == null) { + // response = new UpdateContextResponse(new StatusCode( + // Code.INTERNALERROR_500.getCode(), + // ReasonPhrase.RECEIVERINTERNALERROR_500.toString(), + // "Unspecified error during UpdateContext"), null); + // } else if ((response.getContextElementResponse() == null || response + // .getContextElementResponse().isEmpty()) + // && (response.getErrorCode() == null || (response + // .getErrorCode().getCode() != Code.OK_200.getCode() && + // ignorePubSubFailure))) { + // /* + // * Here if the ContextElementResponse is empty AND the ErrorCode + // * is null or there was an error + // */ + // + // response = new UpdateContextResponse(new StatusCode( + // Code.OK_200.getCode(), ReasonPhrase.OK_200.toString(), + // ""), null); + // } + // } + + List> futureResponseFromIoTConsumersList = null; + if (ignorePubSubFailure) { + + fowardUpdateToIoTConsumers(request); + + } else { + futureResponseFromIoTConsumersList = fowardUpdateToIoTConsumers(request); + } + + /* + * Merging of the various response from different actions + */ + if (futureResponseFromIoTConsumersList == null + || futureResponseFromIoTConsumersList.isEmpty()) { + + if (historicalAgentFuture == null) { + // nothing to check therefore everything is fine + response.setErrorCode(new StatusCode(Code.OK_200.getCode(), + ReasonPhrase.OK_200.toString(), "")); + } else { + // we need to check only if it was stored correctly in the + // historical agent + StatusCode statusCode; + try { + // this call is a blocking call + statusCode = historicalAgentFuture.get(); + } catch (InterruptedException e) { + logger.warn("Storing into historical agent was interrupted: " + + e.getCause()); + statusCode = new StatusCode(500, + ReasonPhrase.RECEIVERINTERNALERROR_500.toString(), + e.getCause().toString()); + } catch (ExecutionException e) { + logger.warn("Storing into historical agent encountered an exception: " + + e.getCause()); + statusCode = new StatusCode(500, + ReasonPhrase.RECEIVERINTERNALERROR_500.toString(), + e.getCause().toString()); + } + response.setErrorCode(statusCode); + } + + } else { + + if (historicalAgentFuture != null) { + + try { + response = new UpdateContextResponse( + historicalAgentFuture.get(), null); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + for (Future futureResponseFromIoTConsumer : futureResponseFromIoTConsumersList) { + + UpdateContextResponse futureResponse; + try { + + futureResponse = futureResponseFromIoTConsumer.get(); + + } catch (InterruptedException e) { + logger.warn("IoT Consumer connecting process was interrupted: " + + e.getCause()); + futureResponse = new UpdateContextResponse(); + futureResponse + .setContextElementResponse(new ArrayList()); + futureResponse + .getContextElementResponse() + .add(new ContextElementResponse( + new ContextElement(), + new StatusCode( + 500, + ReasonPhrase.RECEIVERINTERNALERROR_500 + .toString(), e.getCause() + .toString()))); + } catch (ExecutionException e) { + logger.warn("IoT Consumer connecting process encountered an exception: " + + e.getCause()); + futureResponse = new UpdateContextResponse(); + futureResponse + .setContextElementResponse(new ArrayList()); + futureResponse + .getContextElementResponse() + .add(new ContextElementResponse( + new ContextElement(), + new StatusCode( + 500, + ReasonPhrase.RECEIVERINTERNALERROR_500 + .toString(), e.getCause() + .toString()))); + } + + if (futureResponse != null + && futureResponse.getContextElementResponse() != null + && !futureResponse.getContextElementResponse() + .isEmpty()) { + + if (response.getContextElementResponse() != null) { + + response.getContextElementResponse().addAll( + futureResponse.getContextElementResponse()); + + } else { + + response.setContextElementResponse(futureResponse + .getContextElementResponse()); + + } + + } + + } - response = new UpdateContextResponse(new StatusCode( - Code.OK_200.getCode(), ReasonPhrase.OK_200.toString(), ""), - null); } return response; } - private UpdateContextResponse fowardUpdateToIoTConsumers( - UpdateContextRequest updateContextRequest) { + private List> fowardUpdateToIoTConsumers( + final UpdateContextRequest updateContextRequest) { - UpdateContextResponse response = null; + List> futureResponseList = new ArrayList>(); - boolean noPubSubUrl_ngsiv1_nle = false; + // boolean noPubSubUrl_ngsiv1_nle = false; /* * Forward to IoT Consumers which are compliant with NGSI-10.v1.nle */ if (pubSubUrlList_ngsiv1 != null && !pubSubUrlList_ngsiv1.isEmpty()) { - for (String url : pubSubUrlList_ngsiv1) { + for (final String url : pubSubUrlList_ngsiv1) { - if (url != null) { - logger.info("Started Contact pub/sub broker: " + url); + if (url != null && !url.isEmpty()) { + + futureResponseList.add(forwardUpdate(updateContextRequest, + url, StandardVersion.NGSI10_v1_nle)); - try { - response = ngsi10Requester.updateContext( - updateContextRequest, new URI(url), - StandardVersion.NGSI10_v1_nle); - } catch (URISyntaxException e) { - logger.info("Impossible to connect to the pub/sub broker: " - + url); - if (logger.isDebugEnabled()) { - logger.debug("URI Syntax Error", e); - } - } } - // TODO here the only the last response is taken into - // consideration as updateCotnextResponse. It would be - // necessary to have some rule (for example, ALL, - // ATLEASTONE, MOST, NOONE fault tolerant) + } } else if (pubSubUrl_ngsiv1 != null && !pubSubUrl_ngsiv1.isEmpty()) { - logger.info("Started to contact pub/sub broker: " - + pubSubUrl_ngsiv1); - - try { - response = ngsi10Requester.updateContext(updateContextRequest, - new URI(pubSubUrl_ngsiv1), - StandardVersion.NGSI10_v1_nle); - } catch (URISyntaxException e) { - logger.info("Impossible to connect to the pub/sub broker: " + if (logger.isDebugEnabled()) { + logger.debug("Started to contact pub/sub broker: " + pubSubUrl_ngsiv1); - if (logger.isDebugEnabled()) { - logger.debug("URI Syntax Error", e); - } } - } else { - noPubSubUrl_ngsiv1_nle = true; + + futureResponseList.add(forwardUpdate(updateContextRequest, + pubSubUrl_ngsiv1, StandardVersion.NGSI10_v1_nle)); + + // } else { + // noPubSubUrl_ngsiv1_nle = true; } - boolean noPubSubUrl_ngsiv1_tid = false; + // boolean noPubSubUrl_ngsiv1_tid = false; /* * Forward to IoT Consumers which are compliant with NGSI-10.v1.tid @@ -1505,53 +1707,186 @@ updateContextRequest, new URI(url), && !pubSubUrlList_ngsiv1_orion.isEmpty()) { for (String url : pubSubUrlList_ngsiv1_orion) { - if (url != null) { - logger.info("Started Contact pub/sub broker: " + url); - - try { - response = ngsi10Requester.updateContext( - updateContextRequest, new URI(url), - StandardVersion.NGSI10_v1_tid); - } catch (URISyntaxException e) { - logger.info("Impossible to connect to the pub/sub broker: " - + url); - if (logger.isDebugEnabled()) { - logger.debug("URI Syntax Error", e); - } - } + if (url != null && !url.isEmpty()) { + + futureResponseList.add(forwardUpdate(updateContextRequest, + url, StandardVersion.NGSI10_v1_tid)); + // + // if (logger.isDebugEnabled()) { + // logger.debug("Started Contact pub/sub broker: " + url); + // } + // + // try { + // response = ngsi10Requester.updateContext( + // updateContextRequest, new URI(url), + // StandardVersion.NGSI10_v1_tid); + // } catch (URISyntaxException e) { + // logger.info("Impossible to connect to the pub/sub broker: " + // + url); + // if (logger.isDebugEnabled()) { + // logger.debug("URI Syntax Error", e); + // } + // } } - // TODO here the only the last response is taken into - // consideration as updateCotnextResponse. It would be - // necessary to have some rule (for example, ALL, - // ATLEASTONE, MOST, NOONE fault tolerant) } } else if (pubSubUrl_ngsiv1_orion != null && !pubSubUrl_ngsiv1_orion.isEmpty()) { - logger.info("Started to contact pub/sub broker: " - + pubSubUrl_ngsiv1_orion); - try { - response = ngsi10Requester.updateContext(updateContextRequest, - new URI(pubSubUrl_ngsiv1_orion), - StandardVersion.NGSI10_v1_tid); - } catch (URISyntaxException e) { - logger.info("Impossible to connect to the pub/sub broker: " - + pubSubUrl_ngsiv1_orion); + futureResponseList.add(forwardUpdate(updateContextRequest, + pubSubUrl_ngsiv1_orion, StandardVersion.NGSI10_v1_tid)); + + // if (logger.isDebugEnabled()) { + // logger.debug("Started to contact pub/sub broker: " + // + pubSubUrl_ngsiv1_orion); + // } + // + // try { + // response = ngsi10Requester.updateContext(updateContextRequest, + // new URI(pubSubUrl_ngsiv1_orion), + // StandardVersion.NGSI10_v1_tid); + // } catch (URISyntaxException e) { + // logger.info("Impossible to connect to the pub/sub broker: " + // + pubSubUrl_ngsiv1_orion); + // if (logger.isDebugEnabled()) { + // logger.debug("URI Syntax Error", e); + // } + // } + // } else { + // noPubSubUrl_ngsiv1_tid = true; + } + + // if (noPubSubUrl_ngsiv1_nle && noPubSubUrl_ngsiv1_tid) { + // return new UpdateContextResponse(new StatusCode( + // Code.OK_200.getCode(), ReasonPhrase.OK_200.toString(), ""), + // null); + // } else { + // return response; + // } + + return futureResponseList; + + } + + private Future forwardUpdate( + final UpdateContextRequest updateContextRequest, final String url, + final StandardVersion standardVersion) { + + return executor.submit(new Callable() { + + @Override + public UpdateContextResponse call() throws Exception { + if (logger.isDebugEnabled()) { - logger.debug("URI Syntax Error", e); + logger.debug("Started Contact pub/sub broker: " + url); + } + UpdateContextResponse response = null; + + try { + + response = ngsi10Requester + .updateContext(updateContextRequest, new URI(url), + standardVersion); + } catch (URISyntaxException e) { + logger.info("Impossible to connect to the pub/sub broker: " + + url); + if (logger.isDebugEnabled()) { + logger.debug("URI Syntax Error", e); + } } + + /* + * hereinafter it is just some details injection in the + * statuscode + */ + if (response != null) { + + if (response.getContextElementResponse() != null + && !response.getContextElementResponse().isEmpty()) { + + for (ContextElementResponse contextElementResponse : response + .getContextElementResponse()) { + + if (contextElementResponse.getStatusCode() != null) { + + if (contextElementResponse.getStatusCode() + .getDetails() != null) { + + contextElementResponse + .getStatusCode() + .setDetails( + contextElementResponse + .getStatusCode() + .getDetails() + .toString() + + ". Processed by " + + url + + " with Standard: " + + standardVersion + .toString()); + } else { + contextElementResponse + .getStatusCode() + .setDetails( + "Processed by " + + url + + " with Standard: " + + standardVersion + .toString()); + } + } else { + contextElementResponse + .setStatusCode(new StatusCode(200, + ReasonPhrase.OK_200.toString(), + "Processed by: " + + url + + " with Standard: " + + standardVersion + .toString())); + } + } + + } else if (response.getErrorCode() != null) { + + if (response.getErrorCode().getDetails() != null) { + + response.getErrorCode().setDetails( + response.getErrorCode().getDetails() + .toString() + + ". Processed by " + + url + + " with Standard: " + + standardVersion.toString()); + } else { + response.getErrorCode().setDetails( + "Processed by " + url + " with Standard: " + + standardVersion.toString()); + } + + response.setContextElementResponse(new ArrayList()); + response.getContextElementResponse().add( + new ContextElementResponse( + new ContextElement(), response + .getErrorCode())); + + } else { + response.setContextElementResponse(new ArrayList()); + response.getContextElementResponse().add( + new ContextElementResponse( + new ContextElement(), new StatusCode( + 200, ReasonPhrase.OK_200 + .toString(), + "No response at all from: " + + url + + " with Standard: " + + standardVersion + .toString()))); + } + } + + return response; } - } else { - noPubSubUrl_ngsiv1_tid = true; - } - if (noPubSubUrl_ngsiv1_nle && noPubSubUrl_ngsiv1_tid) { - return new UpdateContextResponse(new StatusCode( - Code.OK_200.getCode(), ReasonPhrase.OK_200.toString(), ""), - null); - } else { - return response; - } + }); } diff --git a/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/subscription/SubscriptionController.java b/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/subscription/SubscriptionController.java index 31d087cf..215dc31f 100644 --- a/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/subscription/SubscriptionController.java +++ b/eu.neclab.iotplatform.iotbroker.core/src/main/java/eu/neclab/iotplatform/iotbroker/core/subscription/SubscriptionController.java @@ -1263,11 +1263,13 @@ private List modifyEntityAttributeBasedAssociation( public NotifyContextResponse receiveReqFrmAgentWrapper( NotifyContextRequest ncReq) { - logger.info("SubscriptionController: Received the following notify context request:" - + ncReq.toString() - + "/n" - + "SubscriptionID is: " - + ncReq.getSubscriptionId()); + if (logger.isDebugEnabled()) { + logger.debug("SubscriptionController: Received the following notify context request:" + + ncReq.toString() + + "/n" + + "SubscriptionID is: " + + ncReq.getSubscriptionId()); + } /* * Retrieves the list of incoming subscription ids that are relevant for @@ -1523,8 +1525,9 @@ public NotifyContextResponse receiveReqFrmAgentWrapper( for (ContextElementResponse contextElementResponse : qCRes_forMerger .getListContextElementResponse()) { - logger.info("###########################################"); - logger.info(contextElementResponse); + if (logger.isDebugEnabled()) { + logger.debug(contextElementResponse); + } // ContextElementResponse contextElementResponse = // qCRes_forMerger diff --git a/eu.neclab.iotplatform.iotbroker.core/src/test/java/eu/neclab/iotplatform/iotbroker/core/junittests/tests/TestFunctionality.java b/eu.neclab.iotplatform.iotbroker.core/src/test/java/eu/neclab/iotplatform/iotbroker/core/junittests/tests/TestFunctionality.java index 85607c14..030fd57c 100644 --- a/eu.neclab.iotplatform.iotbroker.core/src/test/java/eu/neclab/iotplatform/iotbroker/core/junittests/tests/TestFunctionality.java +++ b/eu.neclab.iotplatform.iotbroker.core/src/test/java/eu/neclab/iotplatform/iotbroker/core/junittests/tests/TestFunctionality.java @@ -150,6 +150,9 @@ public class TestFunctionality { UpdateContextResponse updateResp = (UpdateContextResponse) XmlFactory.convertFileToXML("src/test/resources/updateResp.xml", UpdateContextResponse.class); + UpdateContextResponse updateRespTraceByIoTBroker = (UpdateContextResponse) + XmlFactory.convertFileToXML("src/test/resources/updateRespTraceByIoTBroker.xml", UpdateContextResponse.class); + @Before public void before(){ @@ -167,6 +170,7 @@ public void before(){ core.setConfManWrapper(confManWrapper); core.setNorthBoundWrapper(northBoundWrapper); core.enableAssociations(true); + core.setUpdateThreadPoolSize(20); subscriptionController.setAgentWrapper(agentWrapper); subscriptionController.setConfManWrapper(confManWrapper); @@ -215,9 +219,6 @@ public void restrictionTest(){ //execute the test QueryContextResponse brokerResp = core.queryContext(queryReq_attribExpr); - logger.info("ADFASDFADFSSDFASDFASDFASDFASDF"+brokerResp); - - //verify the communication of core EasyMock.verify(ngsi9InterfaceMock); EasyMock.verify(ngsi10RequesterMock); @@ -325,8 +326,9 @@ public void updateTest(){ //verify the communication of core EasyMock.verify(ngsi9InterfaceMock); EasyMock.verify(ngsi10RequesterMock); + - assertEquals(updateResp,brokerResp); + assertEquals(updateRespTraceByIoTBroker.toJsonString(),brokerResp.toJsonString()); logger.info("Successfully tested FIWARE.Feature.IoT.BackendIoTBroker.Update.IdBased"); } diff --git a/eu.neclab.iotplatform.iotbroker.core/src/test/resources/updateRespTraceByIoTBroker.xml b/eu.neclab.iotplatform.iotbroker.core/src/test/resources/updateRespTraceByIoTBroker.xml new file mode 100644 index 00000000..46c29b03 --- /dev/null +++ b/eu.neclab.iotplatform.iotbroker.core/src/test/resources/updateRespTraceByIoTBroker.xml @@ -0,0 +1,17 @@ + + + + + + + Sensor5 + + + + 200 + Ok +
a. Processed by http://192.168.100.1:70/application with Standard: NGSI10_v1_nle
+
+
+
+
\ No newline at end of file diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/META-INF/spring/embeddediotagentcore-context.xml b/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/META-INF/spring/embeddediotagentcore-context.xml index 73e8d292..94e5aa96 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/META-INF/spring/embeddediotagentcore-context.xml +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/META-INF/spring/embeddediotagentcore-context.xml @@ -13,4 +13,12 @@ + + + + + + \ No newline at end of file diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/agentcore/IoTAgentCore.java b/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/agentcore/IoTAgentCore.java index 9f06e69c..906b369e 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/agentcore/IoTAgentCore.java +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.core/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/agentcore/IoTAgentCore.java @@ -47,8 +47,9 @@ import java.net.URI; import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,6 +57,7 @@ import javax.annotation.PostConstruct; import org.apache.log4j.Logger; +import org.springframework.beans.factory.annotation.Value; import eu.neclab.iotplatform.iotbroker.commons.BundleUtils; import eu.neclab.iotplatform.iotbroker.commons.interfaces.EmbeddedAgentRegistryInterface; @@ -67,6 +69,8 @@ import eu.neclab.iotplatform.ngsi.api.datamodel.ContextRegistration; import eu.neclab.iotplatform.ngsi.api.datamodel.DiscoverContextAvailabilityResponse; import eu.neclab.iotplatform.ngsi.api.datamodel.EntityId; +import eu.neclab.iotplatform.ngsi.api.datamodel.ReasonPhrase; +import eu.neclab.iotplatform.ngsi.api.datamodel.StatusCode; import eu.neclab.iotplatform.ngsi.api.datamodel.SubscribeContextRequest; import eu.neclab.iotplatform.ngsi.api.ngsi10.Ngsi10Interface; @@ -75,6 +79,9 @@ public class IoTAgentCore implements IoTAgentInterface { /** The logger. */ private static Logger logger = Logger.getLogger(IoTAgentCore.class); + @Value("${storeOnlyLatestValue:false}") + private boolean storeOnlyLatestValue; + private EmbeddedAgentStorageInterface iotAgentStorage; private EmbeddedAgentSubscriptionHandlerInterface subscriptionHandler; @@ -152,100 +159,156 @@ public void run() { } @Override - public void storeData(final List contextElementList) { - - // List of Task - List> tasks = new ArrayList>(); - - Iterator iter = contextElementList.iterator(); - while (iter.hasNext()) { + public StatusCode storeData(final List contextElementList) { - ContextElement contextElement = iter.next(); + List isolatedContextElementList = new ArrayList(); - // Create a list of ContextElement where each ContextElement has - // exactly one ContextAttribute - // - // TODO change the isolatedContextElement to a new class called - // AtomicContextElement - final List isolatedContextElementList = this - .isolateAttributes(contextElement); - - final Date localDate = new Date(); - - // Iterate over the isolatedContextElement list in order to create - // the tasks used to store data - for (final ContextElement isolatedContextElement : isolatedContextElementList) { + for (ContextElement contextElement : contextElementList) { + isolatedContextElementList.addAll(this + .isolateAttributes(contextElement)); + } - tasks.add(Executors.callable(new Runnable() { + // List of Task + // List> tasks = new ArrayList>(); - @Override - public void run() { + // Iterator iter = contextElementList.iterator(); - if (logger.isDebugEnabled()) { - logger.debug(String.format( - "Storing historically contextElement %s", - isolatedContextElement.toJsonString())); - } - iotAgentStorage.storeHistoricalData( - isolatedContextElement, localDate); + // List contextElementUnsuccessfullyStored = new + // ArrayList(); - if (logger.isDebugEnabled()) { - logger.debug(String.format( - "Storing latest contextElement %s", - isolatedContextElement.toJsonString())); - } - iotAgentStorage.storeLatestData(isolatedContextElement); + // while (iter.hasNext()) { - } + // ContextElement contextElement = iter.next(); - })); + // Create a list of ContextElement where each ContextElement has + // exactly one ContextAttribute + // + // TODO change the isolatedContextElement to a new class called + // AtomicContextElement + // final List isolatedContextElementList = this + // .isolateAttributes(contextElement); - } + final Date localDate = new Date(); - if (BundleUtils.isServiceRegistered(this, subscriptionHandler)) { - tasks.add(Executors.callable(new Runnable() { + if (logger.isDebugEnabled()) { + logger.debug(String.format("Storing contextElement %s", + isolatedContextElementList)); + } + Map successfulMap; + + if (storeOnlyLatestValue) { + successfulMap = iotAgentStorage.storeData( + isolatedContextElementList, null, + localDate); + } else { + successfulMap = iotAgentStorage.storeData( + isolatedContextElementList, isolatedContextElementList, + localDate); + } - @Override - public void run() { - subscriptionHandler - .checkSubscriptions(isolatedContextElementList); - } + // // Iterate over the isolatedContextElement list in order to create + // // the tasks used to store data + // for (final ContextElement isolatedContextElement : + // isolatedContextElementList) { + // + // // tasks.add(Executors.callable(new Runnable() { + // + // // @Override + // // public void run() { + // + // if (logger.isDebugEnabled()) { + // logger.debug(String.format( + // "Storing historically contextElement %s", + // isolatedContextElement.toJsonString())); + // } + // + // iotAgentStorage.storeHistoricalData(isolatedContextElement, + // localDate); + // + // if (logger.isDebugEnabled()) { + // logger.debug(String.format("Storing latest contextElement %s", + // isolatedContextElement.toJsonString())); + // } + // + // boolean successfullyStored = iotAgentStorage + // .storeLatestData(isolatedContextElement); + // if (!successfullyStored) { + // contextElementUnsuccessfullyStored.add(isolatedContextElement); + // } + // + // // } + // + // // })); + // + // } - })); - } - } + // if (BundleUtils.isServiceRegistered(this, subscriptionHandler)) { + // // tasks.add(Executors.callable(new Runnable() { + // + // // @Override + // // public void run() { + // subscriptionHandler + // .checkSubscriptions(isolatedContextElementList); + // // } + // + // // })); + // } + // } // Execute the tasks used to store the data - ExecutorService taskExecutor = Executors.newCachedThreadPool(); - try { - taskExecutor.invokeAll(tasks); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // ExecutorService taskExecutor = Executors.newCachedThreadPool(); + // try { + // taskExecutor.invokeAll(tasks); + // } catch (InterruptedException e) { + // e.printStackTrace(); + // } /* * Check registration */ if (BundleUtils.isServiceRegistered(this, registrationHandler)) { - tasks.add(Executors.callable(new Runnable() { + // tasks.add(Executors.callable(new Runnable() { - @Override - public void run() { - registrationHandler.checkRegistration(contextElementList); + // @Override + // public void run() { + registrationHandler.checkRegistration(contextElementList); - } - - })); - taskExecutor = Executors.newCachedThreadPool(); - try { - taskExecutor.invokeAll(tasks); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // } + // + // })); + // taskExecutor = Executors.newCachedThreadPool(); + // try { + // taskExecutor.invokeAll(tasks); + // } catch (InterruptedException e) { + // e.printStackTrace(); + // } } else if (logger.isDebugEnabled()) { logger.debug("Registry not available"); } + StringBuffer details = new StringBuffer(); + for (Entry success : successfulMap.entrySet()) { + if (!success.getValue()) { + details.append("EntityId+Attribute not successfully stored: " + + success.getKey().toJsonString() + ". "); + } + } + + // if (contextElementUnsuccessfullyStored.isEmpty()) { + if (details.length() == 0) { + return new StatusCode(200, ReasonPhrase.OK_200.toString(), ""); + + } else { + + // for (ContextElement isolatedContextElement : + // contextElementUnsuccessfullyStored) { + // details.append(isolatedContextElement.toJsonString() + "; "); + // } + return new StatusCode(472, + ReasonPhrase.INVALIDPARAMETER_472.toString(), + details.toString()); + } + } /** diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/pom.xml b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/pom.xml index 5d4afa44..9f9c0d3d 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/pom.xml +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/pom.xml @@ -40,15 +40,15 @@ META-INF eu.neclab.iotplatform.couchdb.* - - - - - - - - - + + + + + + + + + @@ -120,6 +120,11 @@ 2.2.11 + + com.github.ben-manes.caffeine + caffeine + 2.5.2 + diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDB.java b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDB.java index c73dc713..1bb2a642 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDB.java +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDB.java @@ -55,6 +55,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; @@ -63,6 +66,9 @@ import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Value; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.gson.JsonArray; @@ -91,6 +97,8 @@ public class CouchDB implements KeyValueStoreInterface, private Map permanentRegistryRevById = new HashMap(); + private LoadingCache lastValueCache; + private String couchDB_ip = null; private String authentication = null; @@ -112,7 +120,7 @@ public class CouchDB implements KeyValueStoreInterface, @Value("${couchdb_password:null}") private String couchDB_PASSWORD; - @Value("${registryDB_NAME:embeddedagentregistrydb}") + @Value("${registrydb_name:embeddedagentregistrydb}") private String registryDB_NAME; public String getCouchDB_HOST() { @@ -151,16 +159,16 @@ public String getUSERNAME() { return couchDB_USERNAME; } - public void setUSERNAME(String uSERNAME) { - couchDB_USERNAME = uSERNAME; + public void setUSERNAME(String username) { + couchDB_USERNAME = username; } public String getPASSWORD() { return couchDB_PASSWORD; } - public void setPASSWORD(String pASSWORD) { - couchDB_PASSWORD = pASSWORD; + public void setPASSWORD(String password) { + couchDB_PASSWORD = password; } public String getKeyToCachePrefix() { @@ -228,11 +236,33 @@ private void postConstruct() { databaseExist = true; } - this.cacheRevisionById(); + lastValueCache = CacheBuilder.newBuilder().maximumSize(1_000_000) + .expireAfterAccess(10, TimeUnit.MINUTES) + .build(new CacheLoader() { + + @Override + public ContextElement load(String key) throws Exception { + return getValuefromCouchDB(key); + } + + @Override + public Map loadAll( + Iterable keys) + throws Exception { + return getValuesFromCouchDB(keys); + } + + }); + + this.cacheLastValue(); + if (logger.isDebugEnabled()) { logger.debug("Cached revisions: " + cachedRevisionByKey); } + logger.info("All caches initialized. LastValueCache elements: " + + lastValueCache.size()); + } catch (MalformedURLException e) { e.printStackTrace(); } @@ -251,7 +281,11 @@ public boolean updateValue(String key, ContextElement contextElement) { * Get the document revision */ String revision = cachedRevisionByKey.get(key); + try { + + String encodedKey = URLEncoder.encode(key, "UTF-8"); + if (revision == null) { // Revision is null because not cached before. It means that @@ -261,7 +295,8 @@ public boolean updateValue(String key, ContextElement contextElement) { FullHttpResponse respFromCouchDB; respFromCouchDB = HttpRequester.sendPut(new URL(getCouchDB_ip() - + couchDB_NAME + "/" + key), value, "application/json"); + + couchDB_NAME + "/" + encodedKey), value, + "application/json"); if (respFromCouchDB == null) { @@ -284,6 +319,8 @@ public boolean updateValue(String key, ContextElement contextElement) { // Update the cache cachedRevisionByKey.put(key, revision); + lastValueCache.put(key, contextElement); + successful = true; } @@ -294,8 +331,8 @@ public boolean updateValue(String key, ContextElement contextElement) { // Update the document FullHttpResponse respFromCouchDB = HttpRequester.sendPut( - new URL(getCouchDB_ip() + couchDB_NAME + "/" + key), - messageBody, "application/json"); + new URL(getCouchDB_ip() + couchDB_NAME + "/" + + encodedKey), messageBody, "application/json"); if (respFromCouchDB.getStatusLine().getStatusCode() > 299) { @@ -312,6 +349,8 @@ public boolean updateValue(String key, ContextElement contextElement) { // Put in cache cachedRevisionByKey.put(key, revision); + lastValueCache.put(key, contextElement); + successful = true; } @@ -375,20 +414,28 @@ private void checkDB() { @Override public boolean storeValue(String key, ContextElement contextElement) { + return this.storeValue(key, contextElement, false); + } + + @Override + public boolean storeValue(String key, ContextElement contextElement, + boolean cacheAfterStoring) { + this.checkDB(); boolean successful = false; try { - FullHttpResponse respFromCouchDB = HttpRequester.sendPut(new URL( - getCouchDB_ip() + couchDB_NAME + "/" + key), contextElement - .toJsonString(), "application/json"); + FullHttpResponse respFromCouchDB = HttpRequester.sendPut( + new URL(getCouchDB_ip() + couchDB_NAME + "/" + + URLEncoder.encode(key, "UTF-8")), + contextElement.toJsonString(), "application/json"); if (respFromCouchDB.getStatusLine().getStatusCode() > 299) { logger.warn("CouchDB database: " + couchDB_NAME - + " did not update correctly the value with key: " - + key + " . Reason: " + respFromCouchDB.getStatusLine()); + + " did not store correctly the value with key: " + key + + " . Reason: " + respFromCouchDB.getStatusLine()); successful = false; @@ -396,6 +443,10 @@ public boolean storeValue(String key, ContextElement contextElement) { successful = true; + if (cacheAfterStoring) { + lastValueCache.put(key, contextElement); + } + } } catch (MalformedURLException e) { @@ -407,7 +458,166 @@ public boolean storeValue(String key, ContextElement contextElement) { return successful; } - private void cacheRevisionById() { + @Override + public Map storeAndUpdateValues( + Map keyValuesToStore, + Map keyValuesToUpdate, + boolean cacheAfterStoring) { + + this.checkDB(); + + // boolean successful = false; + Map successfulMap = new HashMap(); + + String body = generateBulkStoreBody(keyValuesToStore, keyValuesToUpdate); + + try { + FullHttpResponse respFromCouchDB = HttpRequester.sendPost(new URL( + getCouchDB_ip() + couchDB_NAME + "/_bulk_docs"), body, + "application/json"); + + if (respFromCouchDB.getStatusLine().getStatusCode() > 299) { + + logger.warn("CouchDB database: " + couchDB_NAME + + " did not store correctly the values"); + + // successful = false; + + } + + if (respFromCouchDB.getBody() != null) { + + CouchDBBulkStoreResponse parseResp = CouchDBUtil + .parseRevisionsFromCouchdbResponse(respFromCouchDB); + + for (Entry entry : parseResp.getIdAndRevision() + .entrySet()) { + + successfulMap.put(entry.getKey(), true); + + if (keyValuesToUpdate.containsKey(entry.getKey()) + || cacheAfterStoring) { + + cachedRevisionByKey.put(entry.getKey(), + entry.getValue()); + + if (keyValuesToUpdate.containsKey(entry.getKey())) { + lastValueCache.put(entry.getKey(), + keyValuesToUpdate.get(entry.getKey())); + + } else if (keyValuesToStore.containsKey(entry.getKey())) { + + lastValueCache.put(entry.getKey(), + keyValuesToUpdate.get(entry.getKey())); + } else { + logger.warn("CouchDB database: " + couchDB_NAME + + " stored a non-requested value" + + entry.getKey()); + } + + } + } + + for (Entry entry : parseResp + .getErrorInsertion().entrySet()) { + + successfulMap.put(entry.getKey(), false); + + if (keyValuesToUpdate.containsKey(entry.getKey()) + || cacheAfterStoring) { + + logger.warn("CouchDB database: " + couchDB_NAME + + " did not store correctly the value " + + entry.getKey() + " Reason: " + + entry.getValue()); + } + } + + // if (parseResp.getErrorInsertion().size() != 0) { + // successful = false; + // } else { + // successful = true; + // } + + } else { + for (String key : keyValuesToStore.keySet()) { + successfulMap.put(key, false); + } + + for (String key : keyValuesToUpdate.keySet()) { + successfulMap.put(key, false); + } + + } + + } catch (MalformedURLException e) { + logger.info("Impossible to store information into CouchDB", e); + } catch (Exception e) { + e.printStackTrace(); + } + + return successfulMap; + } + + private String generateBulkStoreBody( + Map keyValuesToStore, + Map keyValuesToUpdate) { + StringBuffer body = new StringBuffer(); + body.append("{\"docs\": ["); + + boolean first = true; + for (Entry keyValueToStore : keyValuesToStore + .entrySet()) { + + if (!first) { + body.append(","); + } else { + first = false; + } + String doc = keyValueToStore.getValue().toJsonString(); + try { + doc = doc.replaceFirst("\\{", + "\\{\"_id\":\"" + keyValueToStore.getKey() + "\","); + } catch (Exception e) { + e.printStackTrace(); + } + body.append(doc); + } + + for (Entry keyValueToUpdate : keyValuesToUpdate + .entrySet()) { + + if (!first) { + body.append(","); + } else { + first = false; + } + + String revision = cachedRevisionByKey + .get(keyValueToUpdate.getKey()); + String doc = keyValueToUpdate.getValue().toJsonString(); + + if (revision == null || revision.isEmpty()) { + // logger.warn("Revision not found for key: " + // + keyValueToUpdate.getKey()); + // continue; + doc = doc.replaceFirst("\\{", + "{\"_id\":\"" + keyValueToUpdate.getKey() + "\","); + } else { + doc = doc.replaceFirst("\\{", + "{\"_id\":\"" + keyValueToUpdate.getKey() + + "\",\"_rev\":\"" + revision + "\","); + } + body.append(doc); + + } + body.append("]}"); + + return body.toString(); + + } + + private void cacheLastValue() { this.checkDB(); @@ -416,7 +626,7 @@ private void cacheRevisionById() { String url = null; try { url = String - .format("%s%s/_all_docs?startkey=%%22%s%%22&endkey=%%22%s%%C3%%BF%%22", + .format("%s%s/_all_docs?startkey=%%22%s%%22&endkey=%%22%s%%C3%%BF%%22&include_docs=true", getCouchDB_ip(), couchDB_NAME, keyToCachePrefix, keyToCachePrefix); } catch (Exception e) { @@ -450,8 +660,27 @@ private void cacheRevisionById() { String key = row.get("key").getAsString(); + // try { + // key = URLDecoder.decode(key, "UTF-8"); + // } catch (UnsupportedEncodingException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + cachedRevisionByKey.put(key, rev); + // Parse the ContextElement + if (row.get("doc") != null) { + ContextElement contextElement = (ContextElement) NgsiStructure + .parseStringToJson(row.get("doc") + .toString(), ContextElement.class); + + lastValueCache.put(key, contextElement); + } else { + logger.warn("Inconsistency in CouchDB: " + + row.toString()); + } + } } } @@ -470,13 +699,16 @@ public Collection getKeys(String startKey, String endKey) { this.checkDB(); - String url = String.format( - "%s%s/_all_docs?startkey=%%22%s%%22&endkey=%%22%s%%22", - getCouchDB_ip(), couchDB_NAME, startKey, endKey); - Collection keys = new ArrayList(); try { + + String url = String.format( + "%s%s/_all_docs?startkey=%%22%s%%22&endkey=%%22%s%%22", + getCouchDB_ip(), couchDB_NAME, + URLEncoder.encode(startKey, "UTF-8"), + URLEncoder.encode(endKey, "UTF-8")); + FullHttpResponse response = HttpRequester.sendGet(new URL(url)); if (response == null) { @@ -508,13 +740,16 @@ public Collection getKeys(String startKey, String endKey) { } } } + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Keys for %s parsed response %s", + url, keys)); + } } catch (MalformedURLException e) { e.printStackTrace(); - } - - if (logger.isDebugEnabled()) { - logger.debug(String.format("Keys for %s parsed response %s", url, - keys)); + } catch (UnsupportedEncodingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } return keys; @@ -576,16 +811,15 @@ public Multimap getIdsByType() { return idsByType; } - public ContextElement getValue(String latestValueDocumentKey) { + private ContextElement getValuefromCouchDB(String latestValueDocumentKey) { + this.checkDB(); ContextElement contextElement = null; try { String url = getCouchDB_ip() + couchDB_NAME + "/" - + latestValueDocumentKey; - - // System.out.println("Requesting Url:" + url); + + URLEncoder.encode(latestValueDocumentKey, "UTF-8"); FullHttpResponse httpResponse = HttpRequester.sendGet(new URL(url)); @@ -599,9 +833,6 @@ public ContextElement getValue(String latestValueDocumentKey) { contextElement = (ContextElement) NgsiStructure .parseStringToJson(httpResponse.getBody(), ContextElement.class); - - // System.out.println("PARSED:" + contextElement); - } } @@ -610,7 +841,16 @@ public ContextElement getValue(String latestValueDocumentKey) { } return contextElement; + } + public ContextElement getValue(String latestValueDocumentKey) { + try { + return lastValueCache.get(latestValueDocumentKey); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; } public ContextElement getValues(String startKey, String endKey) { @@ -646,7 +886,18 @@ public ContextElement getValues(String startKey, String endKey) { return historicalContextElement; } - public List getValues(List keys) { + public Collection getValues(List keys) { + try { + return lastValueCache.getAll(keys).values(); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + + private Map getValuesFromCouchDB( + Iterable keys) { // curl -d '{"keys":["bar","baz"]}' -X POST // http://127.0.0.1:5984/foo/_all_docs?include_docs=true @@ -656,20 +907,23 @@ public List getValues(List keys) { boolean first = true; for (String key : keys) { if (first) { - try { - body.append("\"" + URLDecoder.decode(key, "UTF-8") + "\""); - } catch (UnsupportedEncodingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + // try { + // body.append("\"" + URLDecoder.decode(key, "UTF-8") + "\""); + // } catch (UnsupportedEncodingException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + body.append("\"" + key + "\""); first = false; } else { - try { - body.append(",\"" + URLDecoder.decode(key, "UTF-8") + "\""); - } catch (UnsupportedEncodingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + // try { + // body.append(",\"" + URLDecoder.decode(key, "UTF-8") + "\""); + // } catch (UnsupportedEncodingException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + body.append(",\"" + key + "\""); + } } body.append("]}"); @@ -681,7 +935,7 @@ public List getValues(List keys) { String url = getCouchDB_ip() + "/" + couchDB_NAME + "/_all_docs?include_docs=true"; - List contextElements = new ArrayList(); + Map contextElements = new HashMap(); try { FullHttpResponse response = HttpRequester.sendPost(new URL(url), @@ -696,7 +950,7 @@ public List getValues(List keys) { } else { - contextElements = getContextElements(response.getBody()); + contextElements = parseContextElements(response.getBody()); } } catch (MalformedURLException e) { @@ -767,9 +1021,17 @@ private String generateRangeQueryString(String startKey, String endKey) { // "startkey=%22obs_urn:x-iot:smartsantander:1:10006|2015-05-08%2016:00:00%22&endkey=%22obs_urn:x-iot:smartsantander:1:10006|2015-05-08%2017:00:00%22&include_docs=true" - String queryString = new String(String.format( - "startkey=%%22%s%%22&endkey=%%22%s%%22&include_docs=true", - startKey, endKey)); + String queryString = null; + try { + queryString = new String(String.format( + "startkey=%%22%s%%22&endkey=%%22%s%%22&include_docs=true", + // startKey, endKey)); + URLEncoder.encode(startKey, "UTF-8"), + URLEncoder.encode(endKey, "UTF-8"))); + } catch (UnsupportedEncodingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return queryString; @@ -807,9 +1069,10 @@ private ContextElement getHistoricalContextElement(String couchDBResultSet) { return historicalContextElement; } - private List getContextElements(String couchDBResultSet) { + private Map parseContextElements( + String couchDBResultSet) { - List contextElements = new ArrayList(); + Map contextElements = new HashMap(); JsonElement jelement = new JsonParser().parse(couchDBResultSet); if (!jelement.isJsonNull()) { @@ -824,10 +1087,19 @@ private List getContextElements(String couchDBResultSet) { // Parse the ContextElement if (row.get("doc") != null) { + String id = row.get("key").getAsString(); + + // try { + // id = URLDecoder.decode(id, "UTF-8"); + // } catch (UnsupportedEncodingException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + ContextElement contextElement = (ContextElement) NgsiStructure .parseStringToJson(row.get("doc").toString(), ContextElement.class); - contextElements.add(contextElement); + contextElements.put(id, contextElement); } else { logger.warn("Inconsistency in CouchDB: " + row.toString()); } @@ -921,8 +1193,9 @@ public void storeRegistration(String id, RegisterContextRequest registration) { registrationString = "{\"registration\":\"" + registrationString + "\"}"; - FullHttpResponse respFromCouchDB = HttpRequester.sendPut(new URL( - getCouchDB_ip() + registryDB_NAME + "/" + id), + FullHttpResponse respFromCouchDB = HttpRequester.sendPut( + new URL(getCouchDB_ip() + registryDB_NAME + "/" + + URLEncoder.encode(id, "UTF-8")), registrationString, "application/json"); if (logger.isDebugEnabled()) { @@ -979,8 +1252,8 @@ public void deleteRegistration(String id) { } FullHttpResponse response = HttpRequester.sendDelete(new URL( - getCouchDB_ip() + registryDB_NAME + "/" + id + "?rev=" - + rev)); + getCouchDB_ip() + registryDB_NAME + "/" + + URLEncoder.encode(id, "UTF-8") + "?rev=" + rev)); if (response == null) { diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBBulkStoreResponse.java b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBBulkStoreResponse.java new file mode 100644 index 00000000..67a5fa89 --- /dev/null +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBBulkStoreResponse.java @@ -0,0 +1,27 @@ +package eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb; + +import java.util.HashMap; +import java.util.Map; + +public class CouchDBBulkStoreResponse { + + private Map idAndRevision = new HashMap(); + private Map errorInsertion = new HashMap(); + + public Map getIdAndRevision() { + return idAndRevision; + } + + public void setIdAndRevision(Map idAndRevision) { + this.idAndRevision = idAndRevision; + } + + public Map getErrorInsertion() { + return errorInsertion; + } + + public void setErrorInsertion(Map errorInsertion) { + this.errorInsertion = errorInsertion; + } + +} diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBUtil.java b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBUtil.java index b70e27b5..8c0ea487 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBUtil.java +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/couchdb/CouchDBUtil.java @@ -42,174 +42,211 @@ * DAMAGE. ******************************************************************************/ -package eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb; - -import java.net.MalformedURLException; -import java.net.URL; - -import org.apache.log4j.Logger; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - -import eu.neclab.iotplatform.iotbroker.commons.FullHttpResponse; - -public class CouchDBUtil { - - /** The logger. */ - private static Logger logger = Logger.getLogger(CouchDBUtil.class); - - public CouchDBUtil() { - } - - - /** - * Function that checks if the DB exist otherwise it will call a function - * that create the DB inside CouchDB - * - * @throws MalformedURLException - */ - public static boolean checkDB(String couchDB_IP, String couchDB_NAME, - String authentication) throws MalformedURLException { - - boolean resp = false; - - // String response = Client.sendRequest(new URL(couchDB_IP + - // "_all_dbs"), - // "GET", null, "application/json", authentication); - - FullHttpResponse response = HttpRequester.sendGet(new URL(couchDB_IP - + "_all_dbs")); - - if (response == null) { - logger.error("No response from CouchDB!!!!"); - return false; - } - - logger.info(" Response from CouchDB -----> " + response); - - if (response.getBody().contains(couchDB_NAME)) { - - logger.info("Database " + couchDB_NAME + " already exist!"); - resp = true; - } - - return resp; - - } - - /** - * Function that create the DB inside CouchDB - * - * @throws MalformedURLException - */ - public static boolean createDb(String couchDB_IP, String couchDB_NAME, - String authentication) throws MalformedURLException { - - FullHttpResponse response; - try { - response = HttpRequester.sendPut( - new URL(couchDB_IP + couchDB_NAME), "", "application/json"); - - logger.debug("Response to create_db:" + response); - if (response != null - && response.getStatusLine() != null - && (response.getStatusLine().getStatusCode() == 200 || response - .getStatusLine().getStatusCode() == 201)) { - return true; - - } else { - return false; - } - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - return false; - - } - - } - - public static String parseRevisionFromCouchdbResponse( - FullHttpResponse fullHttpResponse) { - - String responseBody = fullHttpResponse.getBody(); - - JsonParser parser = new JsonParser(); - JsonObject o = (JsonObject) parser.parse(responseBody); - - return o.get("rev").getAsString().replaceAll("\"+", ""); - } - - /** - * Function that delete the DB inside CouchDB - * - * @throws MalformedURLException - */ - public static boolean deleteDb(String couchDB_IP, String couchDB_NAME, - String authentication) throws MalformedURLException { - - FullHttpResponse response; - try { - - response = HttpRequester.sendDelete(new URL(couchDB_IP - + couchDB_NAME)); - - logger.debug("Response to deleteDb:" + response); - - if (response != null - && response.getStatusLine() != null - && (response.getStatusLine().getStatusCode() == 200 || response - .getStatusLine().getStatusCode() == 204)) { - return true; - - } else { - return false; - } - - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - return false; - } - - } - - public static void checkViews(String couchDB_IP, String couchDB_NAME) { - - for (View view : View.values()) { - - try { - URL url = new URL(couchDB_IP + couchDB_NAME + "/" - + view.getPath()); - - // Check if the view is there - FullHttpResponse response = HttpRequester.sendGet(url); - - if (response == null) { - logger.error("CouchDB server is not reachable!!!!"); - } else { - - // Store view if not present in the database - if (response.getStatusLine().getStatusCode() == 404) { - logger.info("View missing: " + view.getPath()); - - String jsView = view.getReadyToStoreView(); - - HttpRequester.sendPut(url, jsView, "application/json"); - } - } - - } catch (MalformedURLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - } - -} +package eu.neclab.iotplatform.iotbroker.embeddediotagent.couchdb; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Logger; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import eu.neclab.iotplatform.iotbroker.commons.FullHttpResponse; + +public class CouchDBUtil { + + /** The logger. */ + private static Logger logger = Logger.getLogger(CouchDBUtil.class); + + public CouchDBUtil() { + } + + /** + * Function that checks if the DB exist otherwise it will call a function + * that create the DB inside CouchDB + * + * @throws MalformedURLException + */ + public static boolean checkDB(String couchDB_IP, String couchDB_NAME, + String authentication) throws MalformedURLException { + + boolean resp = false; + + // String response = Client.sendRequest(new URL(couchDB_IP + + // "_all_dbs"), + // "GET", null, "application/json", authentication); + + FullHttpResponse response = HttpRequester.sendGet(new URL(couchDB_IP + + "_all_dbs")); + + if (response == null) { + logger.error("No response from CouchDB!!!!"); + return false; + } + + logger.info(" Response from CouchDB -----> " + response); + + if (response.getBody().contains(couchDB_NAME)) { + + logger.info("Database " + couchDB_NAME + " already exist!"); + resp = true; + } + + return resp; + + } + + /** + * Function that create the DB inside CouchDB + * + * @throws MalformedURLException + */ + public static boolean createDb(String couchDB_IP, String couchDB_NAME, + String authentication) throws MalformedURLException { + + FullHttpResponse response; + try { + response = HttpRequester.sendPut( + new URL(couchDB_IP + couchDB_NAME), "", "application/json"); + + logger.debug("Response to create_db:" + response); + if (response != null + && response.getStatusLine() != null + && (response.getStatusLine().getStatusCode() == 200 || response + .getStatusLine().getStatusCode() == 201)) { + return true; + + } else { + return false; + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return false; + + } + + } + + public static String parseRevisionFromCouchdbResponse( + FullHttpResponse fullHttpResponse) { + + String responseBody = fullHttpResponse.getBody(); + + JsonParser parser = new JsonParser(); + JsonObject o = (JsonObject) parser.parse(responseBody); + + return o.get("rev").getAsString().replaceAll("\"+", ""); + } + + public static CouchDBBulkStoreResponse parseRevisionsFromCouchdbResponse( + FullHttpResponse fullHttpResponse) { + + CouchDBBulkStoreResponse response = new CouchDBBulkStoreResponse(); + + String responseBody = fullHttpResponse.getBody(); + + JsonParser parser = new JsonParser(); + JsonArray array = (JsonArray) parser.parse(responseBody); + +// JsonArray array = o.getAsJsonArray(); + for (int i = 0; i < array.size(); i++) { + + JsonObject row = array.get(i).getAsJsonObject(); + if (row.get("rev") != null) { + response.getIdAndRevision().put(row.get("id").getAsString(), + row.get("rev").getAsString()); + } else { + if (row.get("error") != null) { + response.getErrorInsertion().put( + row.get("id").getAsString(), + "Error " + + row.get("error").getAsString() + + ((row.get("reason") != null) ? " Reason " + + row.get("reason").getAsString() + : "")); + } + } + + } + + return response; + } + + /** + * Function that delete the DB inside CouchDB + * + * @throws MalformedURLException + */ + public static boolean deleteDb(String couchDB_IP, String couchDB_NAME, + String authentication) throws MalformedURLException { + + FullHttpResponse response; + try { + + response = HttpRequester.sendDelete(new URL(couchDB_IP + + couchDB_NAME)); + + logger.debug("Response to deleteDb:" + response); + + if (response != null + && response.getStatusLine() != null + && (response.getStatusLine().getStatusCode() == 200 || response + .getStatusLine().getStatusCode() == 204)) { + return true; + + } else { + return false; + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return false; + } + + } + + public static void checkViews(String couchDB_IP, String couchDB_NAME) { + + for (View view : View.values()) { + + try { + URL url = new URL(couchDB_IP + couchDB_NAME + "/" + + view.getPath()); + + // Check if the view is there + FullHttpResponse response = HttpRequester.sendGet(url); + + if (response == null) { + logger.error("CouchDB server is not reachable!!!!"); + } else { + + // Store view if not present in the database + if (response.getStatusLine().getStatusCode() == 404) { + logger.info("View missing: " + view.getPath()); + + String jsView = view.getReadyToStoreView(); + + HttpRequester.sendPut(url, jsView, "application/json"); + } + } + + } catch (MalformedURLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + } + +} diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.indexer/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/indexer/Indexer.java b/eu.neclab.iotplatform.iotbroker.embeddediotagent.indexer/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/indexer/Indexer.java index ddd39969..a80a96bf 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.indexer/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/indexer/Indexer.java +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.indexer/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/indexer/Indexer.java @@ -47,7 +47,7 @@ import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; -import java.net.URLDecoder; +//import java.net.URLDecoder; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.Collection; @@ -203,7 +203,15 @@ public Multimap matchingIdsAndAttributeNames( || entityIdAndType[0].toLowerCase().equals( URLEncoder .encode(entityId.getId(), "UTF-8") - .toLowerCase())) { + .toLowerCase()) + || URLEncoder + .encode(entityIdAndType[0], "UTF-8") + .toLowerCase() + .equals(URLEncoder.encode(entityId.getId(), + "UTF-8").toLowerCase()) + || URLEncoder.encode(entityIdAndType[0], "UTF-8") + .toLowerCase() + .equals(entityId.getId().toLowerCase())) { // Lets first try to look for the one with the exact id Collection attributeNamesCollect = cachedAttributeNamesById @@ -270,7 +278,7 @@ public String generateKeyForHistoricalData(String id, String attributeName, public String formatDate(Date date) { SimpleDateFormat dateFormat = new SimpleDateFormat( - "yyyy-MM-dd'%20'HH:mm:ss.SSS"); + "yyyy-MM-dd HH:mm:ss.SSS"); return dateFormat.format(date); } @@ -378,20 +386,23 @@ private void cacheAttributeNamesById() { // ÿ is the last character of the UTF-8 character table for (String key : keyValueStore.getKeys(LATEST_VALUE_PREFIX, - LATEST_VALUE_PREFIX + "%C3%BF")) { + LATEST_VALUE_PREFIX + "ÿ")) { String[] entityAndAttributeName = key.split(PREFIX_TO_ID_SEPARATOR)[1] .split(ID_TO_ATTRIBUTENAME_SEPARATOR); if (entityAndAttributeName.length == 2) { - try { - cachedAttributeNamesById.put(URLEncoder.encode( - entityAndAttributeName[0], "UTF-8"), - entityAndAttributeName[1]); - } catch (UnsupportedEncodingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - +// try { +// cachedAttributeNamesById.put(URLEncoder.encode( +// entityAndAttributeName[0], "UTF-8"), +// entityAndAttributeName[1]); +// } catch (UnsupportedEncodingException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } + + cachedAttributeNamesById.put( + entityAndAttributeName[0], + entityAndAttributeName[1]); } } @@ -413,9 +424,9 @@ public Pair generateStartAndEndKeyForLatestValues() { // ÿ is the last character of the UTF-8 character table // %C3%BF is url encoded for ÿ - + String startKey = LATEST_VALUE_PREFIX; - String endKey = LATEST_VALUE_PREFIX + "%C3%BF"; + String endKey = LATEST_VALUE_PREFIX + "ÿ"; return new Pair(startKey, endKey); diff --git a/eu.neclab.iotplatform.iotbroker.embeddediotagent.storage/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/storage/IoTAgentStorage.java b/eu.neclab.iotplatform.iotbroker.embeddediotagent.storage/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/storage/IoTAgentStorage.java index 48411834..a6a635f1 100644 --- a/eu.neclab.iotplatform.iotbroker.embeddediotagent.storage/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/storage/IoTAgentStorage.java +++ b/eu.neclab.iotplatform.iotbroker.embeddediotagent.storage/src/main/java/eu/neclab/iotplatform/iotbroker/embeddediotagent/storage/IoTAgentStorage.java @@ -48,12 +48,14 @@ import java.text.ParsePosition; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -119,7 +121,7 @@ public void setKeyValueStore(KeyValueStoreInterface keyValueStore) { } @Override - public void storeLatestData(ContextElement isolatedContextElement) { + public boolean storeLatestData(ContextElement isolatedContextElement) { String documentKey = indexer .generateKeyForLatestValue( isolatedContextElement.getEntityId().getId(), @@ -146,6 +148,8 @@ public void storeLatestData(ContextElement isolatedContextElement) { } } + return successfullyStored; + } @Override @@ -179,6 +183,124 @@ public void storeHistoricalData(ContextElement isolatedContextElement, } + @Override + public Map storeData( + List isolatedLatestContextElement, + List isolatedHistoricalContextElement, + Date defaultDate) { + + Map successfullyStored = new HashMap(); + + Map historicalContextElementMap = new HashMap(); + Map latestContextElementMap = new HashMap(); + + if (isolatedHistoricalContextElement != null + && !isolatedHistoricalContextElement.isEmpty()) { + + for (ContextElement isolatedContextElement : isolatedHistoricalContextElement) { + Date timestamp = extractTimestamp(isolatedContextElement + .getContextAttributeList().iterator().next()); + + // If no timestamp is found, take the local one. + if (timestamp == null) { + timestamp = defaultDate; + if (logger.isDebugEnabled()) { + logger.debug(String.format("No date found %s", + isolatedContextElement.toJsonString())); + } + } + + String historicalDataDocumentKey = indexer + .generateKeyForHistoricalData(isolatedContextElement + .getEntityId().getId(), (isolatedContextElement + .getEntityId().getType() == null) ? null + : isolatedContextElement.getEntityId() + .getType().toString(), + isolatedContextElement + .getContextAttributeList().iterator() + .next().getName(), timestamp); + + historicalContextElementMap.put(historicalDataDocumentKey, + isolatedContextElement); + } + + } + + if (isolatedLatestContextElement != null + && !isolatedLatestContextElement.isEmpty()) { + + for (ContextElement isolatedContextElement : isolatedLatestContextElement) { + String documentKey = indexer + .generateKeyForLatestValue( + isolatedContextElement.getEntityId().getId(), + (isolatedContextElement.getEntityId().getType() == null) ? null + : isolatedContextElement.getEntityId() + .getType().toString(), + getAttributeNameFromIsolatedContextElement(isolatedContextElement)); + + latestContextElementMap + .put(documentKey, isolatedContextElement); + + } + + } + + Map successfulMap = keyValueStore + .storeAndUpdateValues(historicalContextElementMap, + latestContextElementMap, false); + + for (Entry success : successfulMap.entrySet()) { + + boolean isLatestValue = false; + ContextElement contextElement = null; + if (historicalContextElementMap.containsKey(success.getKey())) { + contextElement = historicalContextElementMap.get(success + .getKey()); + } else if (latestContextElementMap.containsKey(success.getKey())) { + contextElement = latestContextElementMap.get(success.getKey()); + isLatestValue = true; + } + + if (contextElement == null) { + if (success.getValue()) { + if (logger.isDebugEnabled()) { + logger.debug(String.format( + "Stored something on wrong key: %s", + success.getKey())); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug(String + .format("Tried (unsuccessfully) to stored something on wrong key: %s", + success.getKey())); + } + } + } else { + successfullyStored.put(contextElement, success.getValue()); + + if (success.getValue()) { + if (logger.isDebugEnabled()) { + logger.debug(String.format( + "ContextElement %s \n\tsuccessfully stored", + contextElement)); + } + if (isLatestValue) { + indexer.index(contextElement); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug(String + .format("ContextElement %s \n\tnot successfully stored", + contextElement)); + } + } + } + } + + return successfullyStored; + + } + private String getAttributeNameFromIsolatedContextElement( ContextElement isolatedContextElement) { return Iterables.getFirst( @@ -201,8 +323,10 @@ private Date extractTimestamp(ContextAttribute contextAttribute) { if (contextMetadata.getName() != null && (contextMetadata.getName().equalsIgnoreCase( - "creation_time") || contextMetadata.getName() - .equalsIgnoreCase("endtime"))) { + "creation_time") + || contextMetadata.getName().equalsIgnoreCase( + "endtime") || contextMetadata.getName() + .equalsIgnoreCase("nle:date"))) { /* * This contextMetadata is set by the leafengine connector @@ -332,7 +456,7 @@ public List getLatestValues(List entityIdList, entityIdList, attributeNames)); } - List contextElementList = new ArrayList(); + Collection contextElementList = new ArrayList(); Set attributeNamesSet; if (attributeNames != null && !attributeNames.isEmpty()) { @@ -457,25 +581,36 @@ public List getLatestValues(List entityIdList, documentKeys)); } - contextElementList = keyValueStore.getValues(documentKeys); + if (!documentKeys.isEmpty()) { + + contextElementList = keyValueStore.getValues(documentKeys); + } else { + contextElementList = new ArrayList(); + } Map compactedContextElementsMap = new HashMap(); - for (ContextElement contextElement : contextElementList) { + if (contextElementList.size() > 2) { + for (ContextElement contextElement : contextElementList) { - String entityKey = contextElement.getEntityId().getId() - + contextElement.getEntityId().getType(); + String entityKey = contextElement.getEntityId().getId() + + contextElement.getEntityId().getType(); - if (compactedContextElementsMap.containsKey(entityKey)) { + if (compactedContextElementsMap.containsKey(entityKey)) { - compactedContextElementsMap.get(entityKey) - .getContextAttributeList() - .addAll(contextElement.getContextAttributeList()); + compactedContextElementsMap.get(entityKey) + .getContextAttributeList() + .addAll(contextElement.getContextAttributeList()); - } else { - compactedContextElementsMap.put(entityKey, contextElement); - } + } else { + compactedContextElementsMap.put(entityKey, contextElement); + } + } + return new ArrayList( + compactedContextElementsMap.values()); + } else { + return new ArrayList(contextElementList); } // try { @@ -484,9 +619,6 @@ public List getLatestValues(List entityIdList, // e.printStackTrace(); // } - return new ArrayList( - compactedContextElementsMap.values()); - } @Override @@ -508,7 +640,9 @@ public List getHistoricalValues( .synchronizedList(contextElementList); Set attributeNamesSet = new HashSet(); - attributeNamesSet.addAll(attributeNames); + if (attributeNames != null && !attributeNames.isEmpty()) { + attributeNamesSet.addAll(attributeNames); + } final Multimap idsAndAttributeNames = indexer .matchingIdsAndAttributeNames(entityIdList, attributeNamesSet); diff --git a/eu.neclab.iotplatform.iotbroker.restcontroller/src/main/java/eu/neclab/iotplatform/iotbroker/restcontroller/RestProviderController.java b/eu.neclab.iotplatform.iotbroker.restcontroller/src/main/java/eu/neclab/iotplatform/iotbroker/restcontroller/RestProviderController.java index d9dbe55e..c79b2c33 100644 --- a/eu.neclab.iotplatform.iotbroker.restcontroller/src/main/java/eu/neclab/iotplatform/iotbroker/restcontroller/RestProviderController.java +++ b/eu.neclab.iotplatform.iotbroker.restcontroller/src/main/java/eu/neclab/iotplatform/iotbroker/restcontroller/RestProviderController.java @@ -375,7 +375,7 @@ public ResponseEntity queryContext( HttpServletRequest requester, @RequestBody QueryContextRequest request) { - logger.info(" <--- NGSI-10 has received request for Context query resource ---> \n"); + logger.info("\n\n <--- NGSI-10 has received request for Context query resource ---> \n"); // System.out.println(request); diff --git a/eu.neclab.iotplatform.iotbroker.storage/src/main/java/eu/neclab/iotplatform/iotbroker/storage/impl/SubscriptionStorage.java b/eu.neclab.iotplatform.iotbroker.storage/src/main/java/eu/neclab/iotplatform/iotbroker/storage/impl/SubscriptionStorage.java index 6b84f6e7..69308c84 100644 --- a/eu.neclab.iotplatform.iotbroker.storage/src/main/java/eu/neclab/iotplatform/iotbroker/storage/impl/SubscriptionStorage.java +++ b/eu.neclab.iotplatform.iotbroker.storage/src/main/java/eu/neclab/iotplatform/iotbroker/storage/impl/SubscriptionStorage.java @@ -1523,7 +1523,7 @@ private PreparedStatement createCheckSubscriptionQueryStatement( // // // Possible matches: - // A -> 1, 4 + // A -> 1, 4ss // B -> 1, 2, 3, 4 /* diff --git a/fiwareRelease/iotbrokerconfig/embeddedAgent/couchdb.xml b/fiwareRelease/iotbrokerconfig/embeddedAgent/couchdb.xml index 9b306847..42c84a9f 100755 --- a/fiwareRelease/iotbrokerconfig/embeddedAgent/couchdb.xml +++ b/fiwareRelease/iotbrokerconfig/embeddedAgent/couchdb.xml @@ -9,4 +9,8 @@ +true +haregistry20170707173538 +embeddedagent1 + diff --git a/targetPlatform/bundles/caffeine-2.5.2.jar b/targetPlatform/bundles/caffeine-2.5.2.jar new file mode 100644 index 00000000..ed47c928 Binary files /dev/null and b/targetPlatform/bundles/caffeine-2.5.2.jar differ