diff --git a/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPI.java b/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPI.java index fa308b48bdb6..3ad53e4a7ae2 100644 --- a/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPI.java +++ b/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPI.java @@ -79,6 +79,10 @@ public static PublishAuditAPI getInstance(){ //Select public abstract PublishAuditStatus getPublishAuditStatus(String bundleId) throws DotPublisherException; + public abstract List getPublishAuditStatuses(List bundleId) + throws DotPublisherException; + + /** * Return the {@link PublishAuditStatus} for a bundle * diff --git a/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPIImpl.java b/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPIImpl.java index 573bb074049e..f9914da94aa1 100644 --- a/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/publisher/business/PublishAuditAPIImpl.java @@ -44,6 +44,7 @@ public class PublishAuditAPIImpl extends PublishAuditAPI { private static final String UPDATE_ALL_BY_BUNDLEID ="update publishing_queue_audit set status = ?, status_pojo = ?, create_date = ?, status_updated = ? where bundle_id = ? "; private static final String DELETE_BY_BUNDLEID ="delete from publishing_queue_audit where bundle_id = ? "; private static final String SELECT_ALL_BY_BUNDLEID = "select * from publishing_queue_audit where bundle_id = ? "; + private static final String SELECT_ALL_BY_BUNDLES_IDS = "select * from publishing_queue_audit where bundle_id in (%s) "; private static final String SELECT_ALL_ORDER_BY_STATUSUPDATED_DESC = "SELECT * FROM publishing_queue_audit order by status_updated desc"; private static final String SELECT_ALL_BY_BUNDLE_ID_QUERY = "SELECT * FROM publishing_queue_audit WHERE LOWER(bundle_id) LIKE ? ORDER BY status_updated DESC"; private static final String SELECT_MAX_CREATEDATE_BY_STATUS_ISNOT_BUNDLING = "select max(c.create_date) as max_date from publishing_queue_audit c where c.status != ? "; @@ -219,6 +220,31 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId) return getPublishAuditStatus(bundleId, NO_LIMIT_ASSETS); } + @Override + @CloseDBIfOpened + public List getPublishAuditStatuses(List bundleIds) + throws DotPublisherException { + try { + final List result = new ArrayList<>(); + + DotConnect dc = new DotConnect(); + final List parameter = bundleIds.stream().map(id -> "'" + id + "'").collect(Collectors.toList()); + + dc.setSQL(String.format(SELECT_ALL_BY_BUNDLES_IDS, String.join(",", parameter))); + List> items = dc.loadObjectResults(); + + for(Map item: items) { + result.add(turnIntoPublishAuditStatus(NO_LIMIT_ASSETS, item)); + } + + return result; + }catch(Exception e){ + Logger.debug(PublisherUtil.class,e.getMessage(),e); + throw new DotPublisherException("Unable to get list of elements with error:"+e.getMessage(), e); + } + + } + public PublishAuditStatus getPublishAuditStatus(String bundleId, int assetsLimit) throws DotPublisherException { try{ DotConnect dc = new DotConnect(); @@ -227,18 +253,14 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId, int assetsLimit dc.addParam(bundleId); List> res = dc.loadObjectResults(); + if(res.size() > 1) { throw new DotPublisherException("Found duplicate bundle status"); } else { if(!res.isEmpty()) { - final Map publishAuditStatusMap = res.get(0); - final LimitedAssetResult limitedAssetResult = limitAssets( - publishAuditStatusMap.get("status_pojo").toString(), assetsLimit); - - putStatusPojoAndNumberOfAssets(publishAuditStatusMap, - limitedAssetResult.newStatusPojo, limitedAssetResult.numberTotalOfAssets); - return mapper.mapObject(publishAuditStatusMap); + return turnIntoPublishAuditStatus(assetsLimit, res.get(0)); } + return null; } }catch(Exception e){ @@ -247,6 +269,16 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId, int assetsLimit } } + private PublishAuditStatus turnIntoPublishAuditStatus(int assetsLimit, Map map) { + final Map publishAuditStatusMap = map; + final LimitedAssetResult limitedAssetResult = limitAssets( + publishAuditStatusMap.get("status_pojo").toString(), assetsLimit); + + putStatusPojoAndNumberOfAssets(publishAuditStatusMap, + limitedAssetResult.newStatusPojo, limitedAssetResult.numberTotalOfAssets); + return mapper.mapObject(publishAuditStatusMap); + } + private void putStatusPojoAndNumberOfAssets( final Map publishAuditStatusMap, final String newStatusPojo, diff --git a/dotCMS/src/main/java/com/dotcms/publisher/business/PublisherQueueJob.java b/dotCMS/src/main/java/com/dotcms/publisher/business/PublisherQueueJob.java index 2fcf9c8c2612..b2c943069ce8 100644 --- a/dotCMS/src/main/java/com/dotcms/publisher/business/PublisherQueueJob.java +++ b/dotCMS/src/main/java/com/dotcms/publisher/business/PublisherQueueJob.java @@ -26,6 +26,7 @@ import com.dotcms.repackage.com.google.common.collect.Maps; import com.dotcms.repackage.com.google.common.collect.Sets; import com.dotcms.rest.RestClientBuilder; +import com.dotcms.util.JsonUtil; import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.exception.DotSecurityException; @@ -34,6 +35,7 @@ import com.dotmarketing.util.PushPublishLogger; import com.dotmarketing.util.UtilMethods; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.liferay.portal.language.LanguageException; import com.liferay.portal.language.LanguageUtil; import com.liferay.util.StringPool; @@ -43,14 +45,14 @@ import org.quartz.StatefulJob; import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; /** * This Quartz Job runs at second zero of every minute, and is in charge of two main content @@ -97,6 +99,7 @@ public class PublisherQueueJob implements StatefulJob { private static final String ENDPOINT_NAME = "EndpointName"; public static final Integer MAX_NUM_TRIES = Config.getIntProperty("PUBLISHER_QUEUE_MAX_TRIES", 3); + public static final int MAX_SIZE_PP_AUDIT_PAYLOAD = Config.getIntProperty("MAX_SIZE_PP_AUDIT_PAYLOAD", 10000); private final PublishAuditAPI pubAuditAPI = PublishAuditAPI.getInstance(); private final PublishingEndPointAPI endpointAPI = APILocator.getPublisherEndPointAPI(); @@ -275,6 +278,7 @@ private Map> collectEndpointInfoFromRemote(f final Map> endpointsMap = localHistory.getEndpointsMap(); final Client client = getRestClient(); + final Map bundlesToSent = new HashMap<>(); // For each group (environment) for (final String groupID : endpointsMap.keySet() ) { @@ -282,32 +286,22 @@ private Map> collectEndpointInfoFromRemote(f // For each end-point (server) in the group for (final String endpointID : endpointsGroup.keySet() ) { final PublishingEndPoint targetEndpoint = endpointAPI.findEndPointById(endpointID); + if (targetEndpoint != null && !targetEndpoint.isSending()) { ThreadContext.put(ENDPOINT_NAME, ENDPOINT_NAME + "=" + targetEndpoint.getServerName()); // Don't poll status for static publishing if (!AWSS3Publisher.PROTOCOL_AWS_S3.equalsIgnoreCase(targetEndpoint.getProtocol()) && !StaticPublisher.PROTOCOL_STATIC.equalsIgnoreCase(targetEndpoint.getProtocol())) { - try { - // Try to get the status of the remote end-points to - // update the local history - final PublishAuditHistory remoteHistory = getRemoteHistoryFromEndpoint( - bundleAudit, targetEndpoint, client); - if (remoteHistory != null) { - updateLocalPublishDatesFromRemote(localHistory, remoteHistory); - endpointTrackingMap.putAll(remoteHistory.getEndpointsMap()); - updateLocalEndpointDetailFromRemote(localHistory, groupID, endpointID, remoteHistory); - } - } catch (final Exception e) { - // An error occurred when retrieving the end-point's audit info. - // Usually caused by a network problem. - Logger.error(PublisherQueueJob.class, "An error occurred when updating audit status from " + - "endpoint=[" + targetEndpoint.toURL() + "], bundle=[" + bundleAudit.getBundleId() - + "] : " + ExceptionUtil.getErrorMessage(e), e); - final String failedAuditUpdate = "failed-remote-group-" + System.currentTimeMillis(); - final EndpointDetail detail = new EndpointDetail(); - detail.setStatus(Status.FAILED_TO_PUBLISH.getCode()); - endpointTrackingMap.put(failedAuditUpdate, Map.of(failedAuditUpdate, detail)); - PushPublishLogger.log(this.getClass(), "Status update: Failed to update bundle audit status."); + + final BundlesToSent bundleToSend = bundlesToSent.computeIfAbsent(targetEndpoint.getId(), + key -> new BundlesToSent(targetEndpoint)); + + bundleToSend.add(bundleAudit.getBundleId()); + + if (bundleToSend.limitReach()) { + sendBundle(bundleToSend.bundleIds, bundleToSend.targetEndpoint, + client, localHistory, endpointTrackingMap); + bundleToSend.reset(); } } else { final PublishAuditStatus auditStatus = pubAuditAPI.getPublishAuditStatus(bundleAudit.getBundleId()); @@ -316,9 +310,59 @@ private Map> collectEndpointInfoFromRemote(f } } } + + for (BundlesToSent toSent : bundlesToSent.values()) { + + if (toSent.isEmpty()) { + continue; + } + + List> partitions = Lists.partition(toSent.bundleIds, MAX_SIZE_PP_AUDIT_PAYLOAD); + + for (List partition : partitions) { + sendBundle(partition, toSent.targetEndpoint, client, localHistory, endpointTrackingMap); + } + } + return endpointTrackingMap; } + private void sendBundle(List bundleIds, PublishingEndPoint targetEndpoint, Client client, + PublishAuditHistory localHistory, Map> endpointTrackingMap) + throws DotDataException { + + // Try to get the status of the remote end-points to + // update the local history + getRemoteHistoryFromEndpoint(bundleIds, targetEndpoint, client) + .stream() + .filter(Objects::nonNull) + .forEach(remoteHistory -> { + sendBundle(bundleIds, targetEndpoint, localHistory, + endpointTrackingMap, remoteHistory); + }); + + } + + private void sendBundle(List bundleIds, PublishingEndPoint targetEndpoint, PublishAuditHistory localHistory, + Map> endpointTrackingMap, PublishAuditHistory remoteHistory) { + try { + updateLocalPublishDatesFromRemote(localHistory, remoteHistory); + endpointTrackingMap.putAll(remoteHistory.getEndpointsMap()); + updateLocalEndpointDetailFromRemote(localHistory, targetEndpoint.getGroupId(), targetEndpoint.getId(), remoteHistory); + } catch (final Exception e) { + // An error occurred when retrieving the end-point's audit info. + // Usually caused by a network problem. + Logger.error(PublisherQueueJob.class, "An error occurred when updating audit status from " + + "endpoint=[" + targetEndpoint.toURL() + "], bundle=[" + bundleIds.get(0) + + "] : " + ExceptionUtil.getErrorMessage(e), e); + final String failedAuditUpdate = "failed-remote-group-" + System.currentTimeMillis(); + final EndpointDetail detail = new EndpointDetail(); + detail.setStatus(Status.FAILED_TO_PUBLISH.getCode()); + endpointTrackingMap.put(failedAuditUpdate, Map.of(failedAuditUpdate, detail)); + PushPublishLogger.log(this.getClass(), "Status update: Failed to update bundle audit status."); + } + } + /** * Updates the local Publish Audit History with the information retrieved from a remote * end-point. @@ -599,15 +643,25 @@ private void updateLocalPublishDatesFromRemote(final PublishAuditHistory localHi * retrieved. * @return The {@link PublishAuditHistory} of the bundle in the specified end-point. */ - private PublishAuditHistory getRemoteHistoryFromEndpoint(final PublishAuditStatus bundleAudit, + private List getRemoteHistoryFromEndpoint(final List bundleIds, final PublishingEndPoint targetEndpoint, - final Client client) { - final WebTarget webTarget = client.target(targetEndpoint.toURL() + "/api/auditPublishing"); - return PublishAuditHistory.getObjectFromString( - webTarget - .path("get") - .path(bundleAudit.getBundleId()).request().get(String.class)); - } + final Client client) throws DotDataException { + final WebTarget webTarget = client.target(targetEndpoint.toURL() + "/api/auditPublishing/getAll"); + + final String responseBody = webTarget + .request(MediaType.APPLICATION_JSON) + .post(Entity.entity(bundleIds, MediaType.APPLICATION_JSON)) + .readEntity(String.class); + + try { + return (List) JsonUtil.getObjectFromJson(responseBody, List.class) + .stream() + .map(item -> PublishAuditHistory.getObjectFromString(item.toString())) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new DotDataException(e); + } + } /** * Get the Publisher needed depending on the protocol of the end-points of @@ -758,4 +812,36 @@ private void updateAuditStatusErrorMsg(final PublishAuditHistory auditHistory, f new HashMap<>(Map.of(StringPool.BLANK, Map.of(StringPool.BLANK, endpointDetail)))); } + private static class BundlesToSent { + final PublishingEndPoint targetEndpoint; + List bundleIds; + + + public BundlesToSent(final PublishingEndPoint targetEndpoint) { + this.targetEndpoint = targetEndpoint; + this.bundleIds = new ArrayList<>(); + } + + public void add(String publisherStatus) { + bundleIds.add(publisherStatus); + } + + public List getPublishAuditStatuses() { + return bundleIds; + } + + public boolean limitReach() { + return bundleIds.size() >= MAX_SIZE_PP_AUDIT_PAYLOAD; + } + + public void reset() { + bundleIds.clear(); + } + + public boolean isEmpty() { + return bundleIds.isEmpty(); + } + } + + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/AuditPublishingResource.java b/dotCMS/src/main/java/com/dotcms/rest/AuditPublishingResource.java index 5b591831a2fc..0003c5e8eb97 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/AuditPublishingResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/AuditPublishingResource.java @@ -3,15 +3,19 @@ import com.dotcms.publisher.business.DotPublisherException; import com.dotcms.publisher.business.PublishAuditAPI; import com.dotcms.publisher.business.PublishAuditStatus; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; + +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; + +import com.dotmarketing.util.Config; import com.dotmarketing.util.Logger; +import com.google.common.collect.Lists; +import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.List; + @Path("/auditPublishing") @Tag(name = "Publishing") public class AuditPublishingResource { @@ -35,5 +39,21 @@ public Response get(@PathParam("bundleId") String bundleId) { return Response.status(404).build(); } + @POST + @Path("/getAll") + @Produces(MediaType.APPLICATION_JSON) + public Response getAll( List bundleIds) { + try { + final List statuses = auditAPI.getPublishAuditStatuses(bundleIds); + + if(statuses != null) + return Response.ok( statuses.stream().map(status -> status.getStatusPojo().getSerialized() ) ).build(); + } catch (DotPublisherException e) { + Logger.warn(this, "error trying to get status for bundle "+bundleIds.get(0),e); + } + + return Response.status(404).build(); + } + } \ No newline at end of file diff --git a/dotCMS/src/main/webapp/WEB-INF/openapi/openapi.yaml b/dotCMS/src/main/webapp/WEB-INF/openapi/openapi.yaml index 8bc47b99e46e..f0ea39ee81b7 100644 --- a/dotCMS/src/main/webapp/WEB-INF/openapi/openapi.yaml +++ b/dotCMS/src/main/webapp/WEB-INF/openapi/openapi.yaml @@ -174,6 +174,23 @@ paths: description: default response tags: - Publishing + /auditPublishing/getAll: + post: + operationId: getAll + requestBody: + content: + '*/*': + schema: + type: array + items: + type: string + responses: + default: + content: + application/json: {} + description: default response + tags: + - Publishing /bundle: post: operationId: uploadBundleAsync @@ -1679,7 +1696,7 @@ paths: - JavaScript /license/all/{params}: get: - operationId: getAll + operationId: getAll_1 parameters: - in: path name: params @@ -12571,7 +12588,7 @@ paths: - System Storage /v1/system-table: get: - operationId: getAll_1 + operationId: getAll_2 responses: default: content: