Skip to content

Commit

Permalink
Merge pull request #300 from ScorpioBroker/subfix
Browse files Browse the repository at this point in the history
Subfix
  • Loading branch information
ScorpioBroker committed Apr 3, 2022
2 parents 868152f + e2ef442 commit 2b8f739
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 35 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@
Core/QueryManager/logs/
Storage/StorageManager/logs/
Go/pkg/
/*.db
/*.db
/*/bin
/*/*/bin
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package eu.neclab.ngsildbroker.commons.subscriptionbase;

import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;

import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -59,6 +62,7 @@ class NotificationHandlerREST extends BaseNotificationHandler {
protected void sendReply(Notification notification, SubscriptionRequest request) throws Exception {
ResponseEntity<String> compacted;
compacted = notification.toCompactedJson();

HttpEntity<String> entity = new HttpEntity<String>(compacted.getBody(), compacted.getHeaders());

int retryCount = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,10 @@ public static ResponseEntity<String> generateNotification(ArrayListMultimap<Stri
acceptHeader = new ArrayList<String>();
acceptHeader.add("application/json");
}

String body = getReplyBody(acceptHeader, AppConstants.QUERY_ENDPOINT, headers, notificationData, true,
ldContext, context, geometryProperty);
headers.put("Content-Length", body.length() + "");
return ResponseEntity.ok().headers(getHttpHeaders(headers)).body(body);
}

Expand Down Expand Up @@ -685,7 +687,16 @@ public static HttpHeaders getAdditionalHeaders(Map<String, Object> registration,
private static HttpHeaders getHttpHeaders(ArrayListMultimap<String, String> headers) {
HttpHeaders result = new HttpHeaders();
for (String key : headers.keySet()) {
result.put(key, headers.get(key));
switch (key.toLowerCase()) {
case "postman-token":
case "accept-encoding":
case "user-agent":
case "host":
break;
default:
result.put(key, headers.get(key));
}

}
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion OverallParent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.6.1</version>
<version>2.6.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public class CSourceService extends BaseQueryService implements EntryCRUDService
@Value("${scorpio.directDB}")
boolean directDB = true;

//private ArrayListMultimap<String, String> csourceIds = ArrayListMultimap.create();
// private ArrayListMultimap<String, String> csourceIds =
// ArrayListMultimap.create();

HashMap<String, TimerTask> regId2TimerTask = new HashMap<String, TimerTask>();
Timer watchDog = new Timer(true);
Expand All @@ -94,7 +95,7 @@ public class CSourceService extends BaseQueryService implements EntryCRUDService

@SuppressWarnings("unused")
private void loadStoredEntitiesDetails() throws IOException, ResponseException {
//this.csourceIds = csourceInfoDAO.getAllIds();
// this.csourceIds = csourceInfoDAO.getAllIds();
if (AUTO_REG_STATUS.equals("active")) {
Map<String, List<String>> tenant2Entity = csourceInfoDAO.getAllEntities();
for (Entry<String, List<String>> entry : tenant2Entity.entrySet()) {
Expand Down Expand Up @@ -192,11 +193,11 @@ public UpdateResult appendToEntry(ArrayListMultimap<String, String> headers, Str
Map<String, Object> originalRegistration = validateIdAndGetBodyAsMap(registrationId, tenantId);
AppendCSourceRequest request = new AppendCSourceRequest(headers, registrationId, originalRegistration, entry,
options);
TimerTask task = regId2TimerTask.get(registrationId);
if (task != null) {
task.cancel();
}
this.csourceTimerTask(headers, request.getFinalPayload());
TimerTask task = regId2TimerTask.get(registrationId);
if (task != null) {
task.cancel();
}
this.csourceTimerTask(headers, request.getFinalPayload());
pushToDB(request);
sendToKafka(request);
return request.getUpdateResult();
Expand All @@ -219,11 +220,12 @@ public String createEntry(ArrayListMultimap<String, String> headers, Map<String,
id = (String) idObj;
}
CSourceRequest request = new CreateCSourceRequest(resolved, headers, id);
/*String tenantId = HttpUtils.getInternalTenant(headers);
if (this.csourceIds.containsEntry(tenantId, request.getId())) {
throw new ResponseException(ErrorType.AlreadyExists, "CSource already exists");
}
this.csourceIds.put(tenantId, request.getId());*/
/*
* String tenantId = HttpUtils.getInternalTenant(headers); if
* (this.csourceIds.containsEntry(tenantId, request.getId())) { throw new
* ResponseException(ErrorType.AlreadyExists, "CSource already exists"); }
* this.csourceIds.put(tenantId, request.getId());
*/
pushToDB(request);
sendToKafka(request);
return request.getId();
Expand All @@ -249,16 +251,17 @@ public boolean deleteEntry(ArrayListMultimap<String, String> headers, String reg

String tenantId = HttpUtils.getInternalTenant(headers);

//if (!this.csourceIds.containsEntry(tenantId, registrationId)) {
// throw new ResponseException(ErrorType.NotFound, registrationId + " not found.");
//}
// if (!this.csourceIds.containsEntry(tenantId, registrationId)) {
// throw new ResponseException(ErrorType.NotFound, registrationId + " not
// found.");
// }

Map<String, Object> registration = validateIdAndGetBodyAsMap(registrationId, tenantId);
CSourceRequest requestForSub = new DeleteCSourceRequest(registration, headers, registrationId);
sendToKafka(requestForSub);
CSourceRequest request = new DeleteCSourceRequest(null, headers, registrationId);
pushToDB(request);
//this.csourceIds.remove(tenantId, registrationId);
// this.csourceIds.remove(tenantId, registrationId);
return true;

}
Expand Down Expand Up @@ -293,6 +296,9 @@ public void handleEntityDelete(BaseRequest message) {
String tenant = message.getTenant();
Map<String, Object> informationEntry = tenant2EntityId2InformationEntry.remove(tenant, id);
Set<String> ids = tenant2InformationEntry2EntityIds.get(tenant, informationEntry);
if (ids == null) {
return;
}
ids.remove(id);
if (ids.isEmpty()) {
tenant2InformationEntry2EntityIds.remove(tenant, informationEntry);
Expand Down Expand Up @@ -453,24 +459,18 @@ private Map<String, Object> getInformationFromEntity(Map<String, Object> entity)
return null;
}

private void storeInternalEntry(CSourceRequest regEntry) {
private void storeInternalEntry(CSourceRequest regEntry) {
try {
appendToEntry(regEntry.getHeaders(), regEntry.getId(), regEntry.getFinalPayload(), null);
} catch (ResponseException e) {
try {
appendToEntry(regEntry.getHeaders(), regEntry.getId(), regEntry.getFinalPayload(), null);
} catch (ResponseException e) {
try {
createEntry(regEntry.getHeaders(), regEntry.getFinalPayload());
} catch (Exception e1) {
logger.error("Failed to store internal regentry", e1);
}
} catch (Exception e) {
logger.error("Failed to store internal regentry", e);
createEntry(regEntry.getHeaders(), regEntry.getFinalPayload());
} catch (Exception e1) {
logger.error("Failed to store internal regentry", e1);
}






} catch (Exception e) {
logger.error("Failed to store internal regentry", e);
}

}

Expand Down

0 comments on commit 2b8f739

Please sign in to comment.