Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public static PublishAuditAPI getInstance(){
//Select
public abstract PublishAuditStatus getPublishAuditStatus(String bundleId) throws DotPublisherException;

public abstract List<PublishAuditStatus> getPublishAuditStatuses(List<String> bundleId)
throws DotPublisherException;


/**
* Return the {@link PublishAuditStatus} for a bundle
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PublishAuditAPIImpl extends PublishAuditAPI {
private static final String UPDATE_ALL_BY_BUNDLEID ="update publishing_queue_audit set status = ?, status_pojo = ?, create_date = ?, status_updated = ? where bundle_id = ? ";
private static final String DELETE_BY_BUNDLEID ="delete from publishing_queue_audit where bundle_id = ? ";
private static final String SELECT_ALL_BY_BUNDLEID = "select * from publishing_queue_audit where bundle_id = ? ";
private static final String SELECT_ALL_BY_BUNDLES_IDS = "select * from publishing_queue_audit where bundle_id in (%s) ";
private static final String SELECT_ALL_ORDER_BY_STATUSUPDATED_DESC = "SELECT * FROM publishing_queue_audit order by status_updated desc";
private static final String SELECT_ALL_BY_BUNDLE_ID_QUERY = "SELECT * FROM publishing_queue_audit WHERE LOWER(bundle_id) LIKE ? ORDER BY status_updated DESC";
private static final String SELECT_MAX_CREATEDATE_BY_STATUS_ISNOT_BUNDLING = "select max(c.create_date) as max_date from publishing_queue_audit c where c.status != ? ";
Expand Down Expand Up @@ -219,6 +220,31 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId)
return getPublishAuditStatus(bundleId, NO_LIMIT_ASSETS);
}

@Override
@CloseDBIfOpened
public List<PublishAuditStatus> getPublishAuditStatuses(List<String> bundleIds)
throws DotPublisherException {
try {
final List<PublishAuditStatus> result = new ArrayList<>();

DotConnect dc = new DotConnect();
final List<String> parameter = bundleIds.stream().map(id -> "'" + id + "'").collect(Collectors.toList());

dc.setSQL(String.format(SELECT_ALL_BY_BUNDLES_IDS, String.join(",", parameter)));
List<Map<String, Object>> items = dc.loadObjectResults();

for(Map<String, Object> item: items) {
result.add(turnIntoPublishAuditStatus(NO_LIMIT_ASSETS, item));
}

return result;
}catch(Exception e){
Logger.debug(PublisherUtil.class,e.getMessage(),e);
throw new DotPublisherException("Unable to get list of elements with error:"+e.getMessage(), e);
}

}

public PublishAuditStatus getPublishAuditStatus(String bundleId, int assetsLimit) throws DotPublisherException {
try{
DotConnect dc = new DotConnect();
Expand All @@ -227,18 +253,14 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId, int assetsLimit
dc.addParam(bundleId);

List<Map<String, Object>> res = dc.loadObjectResults();

if(res.size() > 1) {
throw new DotPublisherException("Found duplicate bundle status");
} else {
if(!res.isEmpty()) {
final Map<String, Object> publishAuditStatusMap = res.get(0);
final LimitedAssetResult limitedAssetResult = limitAssets(
publishAuditStatusMap.get("status_pojo").toString(), assetsLimit);

putStatusPojoAndNumberOfAssets(publishAuditStatusMap,
limitedAssetResult.newStatusPojo, limitedAssetResult.numberTotalOfAssets);
return mapper.mapObject(publishAuditStatusMap);
return turnIntoPublishAuditStatus(assetsLimit, res.get(0));
}

return null;
}
}catch(Exception e){
Expand All @@ -247,6 +269,16 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId, int assetsLimit
}
}

private PublishAuditStatus turnIntoPublishAuditStatus(int assetsLimit, Map<String, Object> map) {
final Map<String, Object> publishAuditStatusMap = map;
final LimitedAssetResult limitedAssetResult = limitAssets(
publishAuditStatusMap.get("status_pojo").toString(), assetsLimit);

putStatusPojoAndNumberOfAssets(publishAuditStatusMap,
limitedAssetResult.newStatusPojo, limitedAssetResult.numberTotalOfAssets);
return mapper.mapObject(publishAuditStatusMap);
}

private void putStatusPojoAndNumberOfAssets(
final Map<String, Object> publishAuditStatusMap,
final String newStatusPojo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.dotcms.repackage.com.google.common.collect.Maps;
import com.dotcms.repackage.com.google.common.collect.Sets;
import com.dotcms.rest.RestClientBuilder;
import com.dotcms.util.JsonUtil;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.exception.DotSecurityException;
Expand All @@ -34,6 +35,7 @@
import com.dotmarketing.util.PushPublishLogger;
import com.dotmarketing.util.UtilMethods;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.liferay.portal.language.LanguageException;
import com.liferay.portal.language.LanguageUtil;
import com.liferay.util.StringPool;
Expand All @@ -43,14 +45,14 @@
import org.quartz.StatefulJob;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;

/**
* This Quartz Job runs at second zero of every minute, and is in charge of two main content
Expand Down Expand Up @@ -97,6 +99,7 @@ public class PublisherQueueJob implements StatefulJob {
private static final String ENDPOINT_NAME = "EndpointName";

public static final Integer MAX_NUM_TRIES = Config.getIntProperty("PUBLISHER_QUEUE_MAX_TRIES", 3);
public static final int MAX_SIZE_PP_AUDIT_PAYLOAD = Config.getIntProperty("MAX_SIZE_PP_AUDIT_PAYLOAD", 10000);

private final PublishAuditAPI pubAuditAPI = PublishAuditAPI.getInstance();
private final PublishingEndPointAPI endpointAPI = APILocator.getPublisherEndPointAPI();
Expand Down Expand Up @@ -275,39 +278,30 @@ private Map<String, Map<String, EndpointDetail>> collectEndpointInfoFromRemote(f
final Map<String, Map<String, EndpointDetail>> endpointsMap = localHistory.getEndpointsMap();

final Client client = getRestClient();
final Map<String, BundlesToSent> bundlesToSent = new HashMap<>();

// For each group (environment)
for (final String groupID : endpointsMap.keySet() ) {
final Map<String, EndpointDetail> endpointsGroup = endpointsMap.get(groupID);
// For each end-point (server) in the group
for (final String endpointID : endpointsGroup.keySet() ) {
final PublishingEndPoint targetEndpoint = endpointAPI.findEndPointById(endpointID);

if (targetEndpoint != null && !targetEndpoint.isSending()) {
ThreadContext.put(ENDPOINT_NAME, ENDPOINT_NAME + "=" + targetEndpoint.getServerName());
// Don't poll status for static publishing
if (!AWSS3Publisher.PROTOCOL_AWS_S3.equalsIgnoreCase(targetEndpoint.getProtocol())
&& !StaticPublisher.PROTOCOL_STATIC.equalsIgnoreCase(targetEndpoint.getProtocol())) {
try {
// Try to get the status of the remote end-points to
// update the local history
final PublishAuditHistory remoteHistory = getRemoteHistoryFromEndpoint(
bundleAudit, targetEndpoint, client);
if (remoteHistory != null) {
updateLocalPublishDatesFromRemote(localHistory, remoteHistory);
endpointTrackingMap.putAll(remoteHistory.getEndpointsMap());
updateLocalEndpointDetailFromRemote(localHistory, groupID, endpointID, remoteHistory);
}
} catch (final Exception e) {
// An error occurred when retrieving the end-point's audit info.
// Usually caused by a network problem.
Logger.error(PublisherQueueJob.class, "An error occurred when updating audit status from " +
"endpoint=[" + targetEndpoint.toURL() + "], bundle=[" + bundleAudit.getBundleId()
+ "] : " + ExceptionUtil.getErrorMessage(e), e);
final String failedAuditUpdate = "failed-remote-group-" + System.currentTimeMillis();
final EndpointDetail detail = new EndpointDetail();
detail.setStatus(Status.FAILED_TO_PUBLISH.getCode());
endpointTrackingMap.put(failedAuditUpdate, Map.of(failedAuditUpdate, detail));
PushPublishLogger.log(this.getClass(), "Status update: Failed to update bundle audit status.");

final BundlesToSent bundleToSend = bundlesToSent.computeIfAbsent(targetEndpoint.getId(),
key -> new BundlesToSent(targetEndpoint));

bundleToSend.add(bundleAudit.getBundleId());

if (bundleToSend.limitReach()) {
sendBundle(bundleToSend.bundleIds, bundleToSend.targetEndpoint,
client, localHistory, endpointTrackingMap);
bundleToSend.reset();
}
} else {
final PublishAuditStatus auditStatus = pubAuditAPI.getPublishAuditStatus(bundleAudit.getBundleId());
Expand All @@ -316,9 +310,59 @@ private Map<String, Map<String, EndpointDetail>> collectEndpointInfoFromRemote(f
}
}
}

for (BundlesToSent toSent : bundlesToSent.values()) {

if (toSent.isEmpty()) {
continue;
}

List<List<String>> partitions = Lists.partition(toSent.bundleIds, MAX_SIZE_PP_AUDIT_PAYLOAD);

for (List<String> partition : partitions) {
sendBundle(partition, toSent.targetEndpoint, client, localHistory, endpointTrackingMap);
}
}

return endpointTrackingMap;
}

private void sendBundle(List<String> bundleIds, PublishingEndPoint targetEndpoint, Client client,
PublishAuditHistory localHistory, Map<String, Map<String, EndpointDetail>> endpointTrackingMap)
throws DotDataException {

// Try to get the status of the remote end-points to
// update the local history
getRemoteHistoryFromEndpoint(bundleIds, targetEndpoint, client)
.stream()
.filter(Objects::nonNull)
.forEach(remoteHistory -> {
sendBundle(bundleIds, targetEndpoint, localHistory,
endpointTrackingMap, remoteHistory);
});

}

private void sendBundle(List<String> bundleIds, PublishingEndPoint targetEndpoint, PublishAuditHistory localHistory,
Map<String, Map<String, EndpointDetail>> endpointTrackingMap, PublishAuditHistory remoteHistory) {
try {
updateLocalPublishDatesFromRemote(localHistory, remoteHistory);
endpointTrackingMap.putAll(remoteHistory.getEndpointsMap());
updateLocalEndpointDetailFromRemote(localHistory, targetEndpoint.getGroupId(), targetEndpoint.getId(), remoteHistory);
} catch (final Exception e) {
// An error occurred when retrieving the end-point's audit info.
// Usually caused by a network problem.
Logger.error(PublisherQueueJob.class, "An error occurred when updating audit status from " +
"endpoint=[" + targetEndpoint.toURL() + "], bundle=[" + bundleIds.get(0)
+ "] : " + ExceptionUtil.getErrorMessage(e), e);
final String failedAuditUpdate = "failed-remote-group-" + System.currentTimeMillis();
final EndpointDetail detail = new EndpointDetail();
detail.setStatus(Status.FAILED_TO_PUBLISH.getCode());
endpointTrackingMap.put(failedAuditUpdate, Map.of(failedAuditUpdate, detail));
PushPublishLogger.log(this.getClass(), "Status update: Failed to update bundle audit status.");
}
}

/**
* Updates the local Publish Audit History with the information retrieved from a remote
* end-point.
Expand Down Expand Up @@ -599,15 +643,25 @@ private void updateLocalPublishDatesFromRemote(final PublishAuditHistory localHi
* retrieved.
* @return The {@link PublishAuditHistory} of the bundle in the specified end-point.
*/
private PublishAuditHistory getRemoteHistoryFromEndpoint(final PublishAuditStatus bundleAudit,
private List<PublishAuditHistory> getRemoteHistoryFromEndpoint(final List<String> bundleIds,
final PublishingEndPoint targetEndpoint,
final Client client) {
final WebTarget webTarget = client.target(targetEndpoint.toURL() + "/api/auditPublishing");
return PublishAuditHistory.getObjectFromString(
webTarget
.path("get")
.path(bundleAudit.getBundleId()).request().get(String.class));
}
final Client client) throws DotDataException {
final WebTarget webTarget = client.target(targetEndpoint.toURL() + "/api/auditPublishing/getAll");

final String responseBody = webTarget
.request(MediaType.APPLICATION_JSON)
.post(Entity.entity(bundleIds, MediaType.APPLICATION_JSON))
.readEntity(String.class);

try {
return (List<PublishAuditHistory>) JsonUtil.getObjectFromJson(responseBody, List.class)
.stream()
.map(item -> PublishAuditHistory.getObjectFromString(item.toString()))
.collect(Collectors.toList());
} catch (IOException e) {
throw new DotDataException(e);
}
}

/**
* Get the Publisher needed depending on the protocol of the end-points of
Expand Down Expand Up @@ -758,4 +812,36 @@ private void updateAuditStatusErrorMsg(final PublishAuditHistory auditHistory, f
new HashMap<>(Map.of(StringPool.BLANK, Map.of(StringPool.BLANK, endpointDetail))));
}

private static class BundlesToSent {
final PublishingEndPoint targetEndpoint;
List<String> bundleIds;


public BundlesToSent(final PublishingEndPoint targetEndpoint) {
this.targetEndpoint = targetEndpoint;
this.bundleIds = new ArrayList<>();
}

public void add(String publisherStatus) {
bundleIds.add(publisherStatus);
}

public List<String> getPublishAuditStatuses() {
return bundleIds;
}

public boolean limitReach() {
return bundleIds.size() >= MAX_SIZE_PP_AUDIT_PAYLOAD;
}

public void reset() {
bundleIds.clear();
}

public boolean isEmpty() {
return bundleIds.isEmpty();
}
}


}
28 changes: 24 additions & 4 deletions dotCMS/src/main/java/com/dotcms/rest/AuditPublishingResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import com.dotcms.publisher.business.DotPublisherException;
import com.dotcms.publisher.business.PublishAuditAPI;
import com.dotcms.publisher.business.PublishAuditStatus;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.google.common.collect.Lists;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.tags.Tag;

import java.util.List;

@Path("/auditPublishing")
@Tag(name = "Publishing")
public class AuditPublishingResource {
Expand All @@ -35,5 +39,21 @@ public Response get(@PathParam("bundleId") String bundleId) {
return Response.status(404).build();
}

@POST
@Path("/getAll")
@Produces(MediaType.APPLICATION_JSON)
public Response getAll( List<String> bundleIds) {
try {
final List<PublishAuditStatus> statuses = auditAPI.getPublishAuditStatuses(bundleIds);

if(statuses != null)
return Response.ok( statuses.stream().map(status -> status.getStatusPojo().getSerialized() ) ).build();
} catch (DotPublisherException e) {
Logger.warn(this, "error trying to get status for bundle "+bundleIds.get(0),e);
}

return Response.status(404).build();
}


}
Loading
Loading