From e5b6158d6b297f32b523de9546bfb195c583ef11 Mon Sep 17 00:00:00 2001 From: Nicolas Peltier Date: Wed, 17 May 2017 17:14:25 +0200 Subject: [PATCH] SLING-6623 allow async execution of pipes - moved execution code to a central place, under plumber implementation, - added status, and bufferization of the commits (async execution means potentially long executions), - added job registering & processing capabilities for plumber, based on a configurable service user (will update documentation accordingly) --- contrib/extensions/sling-pipes/pom.xml | 7 +- .../java/org/apache/sling/pipes/BasePipe.java | 10 + .../org/apache/sling/pipes/OutputWriter.java | 60 ++++- .../java/org/apache/sling/pipes/Pipe.java | 7 + .../java/org/apache/sling/pipes/Plumber.java | 38 +++- .../pipes/internal/DefaultOutputWriter.java | 9 +- .../sling/pipes/internal/NopWriter.java | 47 ++++ .../sling/pipes/internal/PlumberImpl.java | 208 ++++++++++++++---- .../sling/pipes/internal/PlumberServlet.java | 112 +++++----- .../org/apache/sling/pipes/package-info.java | 2 +- .../apache/sling/pipes/AbstractPipeTest.java | 8 +- .../pipes/internal/PlumberServletTest.java | 2 +- 12 files changed, 376 insertions(+), 134 deletions(-) create mode 100644 contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/NopWriter.java diff --git a/contrib/extensions/sling-pipes/pom.xml b/contrib/extensions/sling-pipes/pom.xml index 1ba63aeaf2..77fb03032e 100644 --- a/contrib/extensions/sling-pipes/pom.xml +++ b/contrib/extensions/sling-pipes/pom.xml @@ -30,7 +30,7 @@ org.apache.sling.pipes bundle - 0.0.11-SNAPSHOT + 1.0.0-SNAPSHOT Apache Sling Pipes bulk content changes tool @@ -137,6 +137,11 @@ 2.0.6 provided + + org.apache.sling + org.apache.sling.event.api + 1.0.0 + junit diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/BasePipe.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/BasePipe.java index afaad61963..b644b13354 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/BasePipe.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/BasePipe.java @@ -37,6 +37,11 @@ public class BasePipe implements Pipe { public static final String RESOURCE_TYPE = "slingPipes/base"; public static final String DRYRUN_KEY = "dryRun"; + public static final String READ_ONLY = "readOnly"; + public static final String PN_STATUS = "status"; + public static final String PN_STATUS_MODIFIED = "statusModified"; + public static final String STATUS_STARTED = "started"; + public static final String STATUS_FINISHED = "finished"; protected static final String DRYRUN_EXPR = "${" + DRYRUN_KEY + "}"; protected ResourceResolver resolver; @@ -63,6 +68,11 @@ public void setParent(ContainerPipe parent) { this.parent = parent; } + @Override + public Resource getResource() { + return resource; + } + protected Plumber plumber; private String name; diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/OutputWriter.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/OutputWriter.java index 3825b2aa60..13e1bc7a15 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/OutputWriter.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/OutputWriter.java @@ -26,41 +26,83 @@ /** * defines how pipe's output get written to a servlet response */ -public interface OutputWriter { +public abstract class OutputWriter { - String KEY_SIZE = "size"; + public static final String KEY_SIZE = "size"; - String KEY_ITEMS = "items"; + public static final String KEY_ITEMS = "items"; + + public static final String PARAM_SIZE = KEY_SIZE; + + public static final int NB_MAX = 10; + + protected int size; + + protected int max = NB_MAX; + + protected Pipe pipe; /** * * @param request current request * @return true if this writer handles that request */ - boolean handleRequest(SlingHttpServletRequest request); + public abstract boolean handleRequest(SlingHttpServletRequest request); /** * Init the writer, writes beginning of the output * @param request request from which writer will output * @param response response on which writer will output - * @param pipe pipe whose output will be written * @throws IOException error handling streams * @throws JSONException in case invalid json is written */ - void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException; + public void init(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException{ + max = request.getParameter(PARAM_SIZE) != null ? Integer.parseInt(request.getParameter(PARAM_SIZE)) : NB_MAX; + if (max < 0) { + max = Integer.MAX_VALUE; + } + initInternal(request, response); + } + + /** + * Init the writer, writes beginning of the output + * @param request request from which writer will output + * @param response response on which writer will output + * @throws IOException error handling streams + * @throws JSONException in case invalid json is written + */ + protected abstract void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException; /** * Write a given resource * @param resource resource that will be written * @throws JSONException in case write fails */ - void writeItem(Resource resource) throws JSONException; + public void write(Resource resource) throws JSONException{ + if (size++ < max) { + writeItem(resource); + } + } + + /** + * Write a given resource + * @param resource resource that will be written + * @throws JSONException in case write fails + */ + protected abstract void writeItem(Resource resource) throws JSONException; /** * writes the end of the output - * @param size size of the overall result * @throws JSONException in case invalid json is written */ - void ends(int size) throws JSONException; + public abstract void ends() throws JSONException; + + /** + * + * @param pipe + */ + public void setPipe(Pipe pipe) { + this.pipe = pipe; + } } diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Pipe.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Pipe.java index 30ceaceb5d..d650dbbd5a 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Pipe.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Pipe.java @@ -89,6 +89,13 @@ public interface Pipe { */ Resource getInput(); + + /** + * get the pipe configuration resource + * @return + */ + Resource getResource(); + /** * returns the binding output used in container pipe's expression * @return object, either value map or something else, that will be used in nashorn for computing expressions diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Plumber.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Plumber.java index 12ec4eb695..d831167ffe 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Plumber.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/Plumber.java @@ -19,6 +19,7 @@ import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.event.jobs.Job; import java.util.Map; import java.util.Set; @@ -37,37 +38,38 @@ public interface Plumber { */ Pipe getPipe(Resource resource); + /** + * executes in a background thread + * @param resolver + * @param path + * @param bindings + * @return Job if registered, null otherwise + */ + Job executeAsync(ResourceResolver resolver, String path, Map bindings); + /** * Executes a pipe at a certain path * @param resolver resource resolver with which pipe will be executed * @param path path of a valid pipe configuration * @param bindings bindings to add to the execution of the pipe, can be null + * @param writer output of the pipe * @param save in case that pipe writes anything, wether the plumber should save changes or not * @throws Exception in case execution fails * @return set of paths of output resources */ - Set execute(ResourceResolver resolver, String path, Map bindings, boolean save) throws Exception; + Set execute(ResourceResolver resolver, String path, Map bindings, OutputWriter writer, boolean save) throws Exception; /** * Executes a given pipe * @param resolver resource resolver with which pipe will be executed * @param pipe pipe to execute * @param bindings bindings to add to the execution of the pipe, can be null + * @param writer output of the pipe * @param save in case that pipe writes anything, wether the plumber should save changes or not * @throws Exception in case execution fails * @return set of paths of output resources */ - Set execute(ResourceResolver resolver, Pipe pipe, Map bindings, boolean save) throws Exception; - - /** - * Persist some pipe changes, and eventually distribute changes - * @param resolver resolver with which changes will be persisted - * @param pipe pipe from which the change occurred - * @param paths set of changed paths - * @throws PersistenceException in case persisting fails - */ - - void persist(ResourceResolver resolver, Pipe pipe, Set paths) throws PersistenceException; + Set execute(ResourceResolver resolver, Pipe pipe, Map bindings, OutputWriter writer, boolean save) throws Exception; /** * Registers @@ -76,5 +78,17 @@ public interface Plumber { */ void registerPipe(String type, Class pipeClass); + /** + * status of the pipe + * @param pipeResource resource corresponding to the pipe + * @return + */ + String getStatus(Resource pipeResource); + /** + * returns true if the pipe is considered to be running + * @param pipeResource resource corresponding to the pipe + * @return + */ + boolean isRunning(Resource pipeResource); } diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java index a86e48a69f..7005598f34 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/DefaultOutputWriter.java @@ -29,23 +29,20 @@ /** * default output writer with size and output resources' path */ -public class DefaultOutputWriter implements OutputWriter { +public class DefaultOutputWriter extends OutputWriter { protected JSONWriter writer; - protected Pipe pipe; - @Override public boolean handleRequest(SlingHttpServletRequest request) { return true; } @Override - public void init(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException { + protected void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException { response.setCharacterEncoding("utf-8"); response.setContentType("application/json"); writer = new JSONWriter(response.getWriter()); - this.pipe = pipe; writer.object(); writer.key(KEY_ITEMS); writer.array(); @@ -57,7 +54,7 @@ public void writeItem(Resource resource) throws JSONException { } @Override - public void ends(int size) throws JSONException { + public void ends() throws JSONException { writer.endArray(); writer.key(KEY_SIZE).value(size); writer.endObject(); diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/NopWriter.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/NopWriter.java new file mode 100644 index 0000000000..cd26e70615 --- /dev/null +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/NopWriter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sling.pipes.internal; + +import org.apache.sling.api.SlingHttpServletRequest; +import org.apache.sling.api.SlingHttpServletResponse; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.commons.json.JSONException; +import org.apache.sling.pipes.OutputWriter; + +import java.io.IOException; + +public class NopWriter extends OutputWriter { + @Override + public boolean handleRequest(SlingHttpServletRequest request) { + return false; + } + + @Override + protected void initInternal(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException { + //nop + } + + @Override + protected void writeItem(Resource resource) throws JSONException { + //nop + } + + @Override + public void ends() throws JSONException { + //nop + } +} diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java index 87645b031c..a10673c615 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java @@ -16,49 +16,75 @@ */ package org.apache.sling.pipes.internal; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.commons.lang.StringUtils; -import org.apache.sling.api.resource.PersistenceException; -import org.apache.sling.api.resource.Resource; -import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.SlingConstants; +import org.apache.sling.api.resource.*; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.DistributionResponse; import org.apache.sling.distribution.Distributor; import org.apache.sling.distribution.SimpleDistributionRequest; -import org.apache.sling.pipes.BasePipe; -import org.apache.sling.pipes.ContainerPipe; -import org.apache.sling.pipes.Pipe; -import org.apache.sling.pipes.Plumber; -import org.apache.sling.pipes.ReferencePipe; +import org.apache.sling.event.jobs.Job; +import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.consumer.JobConsumer; +import org.apache.sling.pipes.*; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.component.annotations.ReferenceCardinality; import org.osgi.service.component.annotations.ReferencePolicy; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jcr.RepositoryException; + +import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE; +import static org.apache.sling.pipes.BasePipe.*; + /** - * implements plumber interface, and registers default pipes + * implements plumber interface, registers default pipes, and provides execution facilities */ -@Component(service = {Plumber.class}) -public class PlumberImpl implements Plumber { +@Component(service = {Plumber.class, JobConsumer.class}, property = { + JobConsumer.PROPERTY_TOPICS +"="+PlumberImpl.SLING_EVENT_TOPIC +}) +@Designate(ocd = PlumberImpl.Configuration.class) +public class PlumberImpl implements Plumber, JobConsumer { private final Logger log = LoggerFactory.getLogger(this.getClass()); + public static final int DEFAULT_BUFFER_SIZE = 1000; + + @ObjectClassDefinition(name="Apache Sling Pipes : Plumber configuration") + public @interface Configuration { + @AttributeDefinition(description="Number of iterations after which plumber should saves a pipe execution") + int bufferSize() default PlumberImpl.DEFAULT_BUFFER_SIZE; + + @AttributeDefinition(description="Name of service user, with appropriate rights, that will be used for async execution") + String serviceUser(); + + @AttributeDefinition(description="Users allowed to register async pipes") + String[] authorizedUsers() default {"admin"}; + } Map> registry; - @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL) - protected volatile Distributor distributor = null; + public static final String SLING_EVENT_TOPIC = "org/apache/sling/pipes/topic"; + + private int bufferSize; + + private Map serviceUser; + + private List allowedUsers; @Activate - public void activate(){ + public void activate(Configuration configuration){ + bufferSize = configuration.bufferSize(); + serviceUser = Collections.singletonMap(SUBSERVICE, configuration.serviceUser()); + allowedUsers = Arrays.asList(configuration.authorizedUsers()); registry = new HashMap<>(); registerPipe(BasePipe.RESOURCE_TYPE, BasePipe.class); registerPipe(ContainerPipe.RESOURCE_TYPE, ContainerPipe.class); @@ -75,8 +101,18 @@ public void activate(){ registerPipe(PathPipe.RESOURCE_TYPE, PathPipe.class); registerPipe(FilterPipe.RESOURCE_TYPE, FilterPipe.class); registerPipe(NotPipe.RESOURCE_TYPE, NotPipe.class); + } + @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL) + protected volatile Distributor distributor = null; + + @Reference + JobManager jobManager; + + @Reference + ResourceResolverFactory factory; + @Override public Pipe getPipe(Resource resource) { if ((resource == null) || !registry.containsKey(resource.getResourceType())) { @@ -93,43 +129,84 @@ public Pipe getPipe(Resource resource) { } @Override - public Set execute(ResourceResolver resolver, String path, Map additionalBindings, boolean save) throws Exception { + public Job executeAsync(ResourceResolver resolver, String path, Map bindings) { + if (allowedUsers.contains(resolver.getUserID())) { + if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) { + log.error("please configure plumber service user"); + } + final Map props = new HashMap(); + props.put(SlingConstants.PROPERTY_PATH, path); + props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings); + return jobManager.addJob(SLING_EVENT_TOPIC, props); + } + return null; + } + + @Override + public Set execute(ResourceResolver resolver, String path, Map additionalBindings, OutputWriter writer, boolean save) throws Exception { Resource pipeResource = resolver.getResource(path); Pipe pipe = getPipe(pipeResource); if (pipe == null) { throw new Exception("unable to build pipe based on configuration at " + path); } - return execute(resolver, pipe, additionalBindings, save); + if (additionalBindings != null && (Boolean)additionalBindings.getOrDefault(BasePipe.READ_ONLY, true) && pipe.modifiesContent()) { + throw new Exception("This pipe modifies content, you should use a POST request"); + } + return execute(resolver, pipe, additionalBindings, writer, save); } @Override - public Set execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, boolean save) throws Exception { - if (additionalBindings != null && pipe instanceof ContainerPipe){ - pipe.getBindings().addBindings(additionalBindings); - } + public Set execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, OutputWriter writer, boolean save) throws Exception { + try { + if (additionalBindings != null && pipe instanceof ContainerPipe){ + pipe.getBindings().addBindings(additionalBindings); + } + log.info("[{}] execution starts, save ({})", pipe, save); + writer.setPipe(pipe); + if (isRunning(pipe.getResource())){ + throw new RuntimeException("Pipe is already running"); + } + writeStatus(pipe, STATUS_STARTED); + resolver.commit(); - log.info("[{}] execution starts, save ({})", pipe, save); - Set set = new HashSet<>(); - for (Iterator it = pipe.getOutput(); it.hasNext();){ - Resource resource = it.next(); - if (resource != null) { - log.debug("[{}] retrieved {}", pipe.getName(), resource.getPath()); - set.add(resource.getPath()); + Set set = new HashSet<>(); + for (Iterator it = pipe.getOutput(); it.hasNext();){ + Resource resource = it.next(); + if (resource != null) { + log.debug("[{}] retrieved {}", pipe.getName(), resource.getPath()); + writer.write(resource); + set.add(resource.getPath()); + persist(resolver, pipe, set, resource); + } } + if (save && pipe.modifiesContent()) { + persist(resolver, pipe, set, null); + } + log.info("[{}] done executing.", pipe.getName()); + writer.ends(); + return set; + } finally { + writeStatus(pipe, STATUS_FINISHED); + resolver.commit(); } - if (save) { - persist(resolver, pipe, set); - } - log.info("[{}] done executing.", pipe.getName()); - return set; } - @Override - public void persist(ResourceResolver resolver, Pipe pipe, Set paths) throws PersistenceException { + /** + * Persists pipe change if big enough, or ended, and eventually distribute changes + * @param resolver + * @param pipe + * @param paths + * @param currentResource if running, null if ended + * @throws PersistenceException + */ + protected void persist(ResourceResolver resolver, Pipe pipe, Set paths, Resource currentResource) throws Exception { if (pipe.modifiesContent() && resolver.hasChanges() && !pipe.isDryRun()){ - log.info("[{}] saving changes...", pipe.getName()); - resolver.commit(); - if (distributor != null && StringUtils.isNotBlank(pipe.getDistributionAgent())) { + if (currentResource == null || paths.size() % bufferSize == 0){ + log.info("[{}] saving changes...", pipe.getName()); + writeStatus(pipe, currentResource == null ? STATUS_FINISHED : currentResource.getPath()); + resolver.commit(); + } + if (currentResource == null && distributor != null && StringUtils.isNotBlank(pipe.getDistributionAgent())) { log.info("a distribution agent is configured, will try to distribute the changes"); DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, true, paths.toArray(new String[paths.size()])); DistributionResponse response = distributor.distribute(pipe.getDistributionAgent(), resolver, request); @@ -142,4 +219,51 @@ public void persist(ResourceResolver resolver, Pipe pipe, Set paths) thr public void registerPipe(String type, Class pipeClass) { registry.put(type, pipeClass); } + + /** + * writes the status of the pipe + * @param pipe + * @param status + */ + protected void writeStatus(Pipe pipe, String status) throws RepositoryException { + if (StringUtils.isNotBlank(status)){ + ModifiableValueMap vm = pipe.getResource().adaptTo(ModifiableValueMap.class); + vm.put(PN_STATUS, status); + Calendar cal = new GregorianCalendar(); + cal.setTime(new Date()); + vm.put(PN_STATUS_MODIFIED, cal); + } + } + + @Override + public String getStatus(Resource pipeResource) { + Resource statusResource = pipeResource.getChild(PN_STATUS); + if (statusResource != null){ + String status = statusResource.adaptTo(String.class); + if (StringUtils.isNotBlank(status)){ + return status; + } + } + return STATUS_FINISHED; + } + + @Override + public boolean isRunning(Resource pipeResource) { + return !getStatus(pipeResource).equals(STATUS_FINISHED); + } + + @Override + public JobResult process(Job job) { + try(ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)){ + String path = (String)job.getProperty(SlingConstants.PROPERTY_PATH); + Map bindings = (Map)job.getProperty(PipeBindings.NN_ADDITIONALBINDINGS); + execute(resolver, path, bindings, new NopWriter(), true); + return JobResult.OK; + } catch (LoginException e) { + log.error("unable to retrieve resolver for executing scheduled pipe", e); + } catch (Exception e) { + log.error("failed to execute the pipe", e); + } + return JobResult.FAILED; + } } diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java index 60b7169af8..16728406f5 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/internal/PlumberServlet.java @@ -17,30 +17,21 @@ package org.apache.sling.pipes.internal; import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; +import java.util.*; import javax.servlet.Servlet; import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang.StringUtils; import org.apache.sling.api.SlingHttpServletRequest; import org.apache.sling.api.SlingHttpServletResponse; -import org.apache.sling.api.resource.Resource; -import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.servlets.SlingAllMethodsServlet; import org.apache.sling.api.servlets.ServletResolverConstants; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; -import org.apache.sling.pipes.BasePipe; -import org.apache.sling.pipes.ContainerPipe; -import org.apache.sling.pipes.OutputWriter; -import org.apache.sling.pipes.Pipe; -import org.apache.sling.pipes.PipeBindings; -import org.apache.sling.pipes.Plumber; +import org.apache.sling.event.jobs.Job; +import org.apache.sling.pipes.*; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; @@ -50,7 +41,6 @@ /** * Servlet executing plumber for a pipe path given as 'path' parameter, * it can also be launched against a container pipe resource directly (no need for path parameter) - * */ @Component(service = {Servlet.class}, property= { @@ -70,16 +60,18 @@ public class PlumberServlet extends SlingAllMethodsServlet { protected static final String PARAM_BINDINGS = "bindings"; - protected static final String PARAM_SIZE = "size"; - - public static final int NB_MAX = 10; + protected static final String PARAM_ASYNC = "async"; @Reference Plumber plumber; @Override protected void doGet(SlingHttpServletRequest request, SlingHttpServletResponse response) throws ServletException, IOException { - execute(request, response, false); + if (Arrays.asList(request.getRequestPathInfo().getSelectors()).contains(BasePipe.PN_STATUS)){ + response.getWriter().append(plumber.getStatus(request.getResource())); + } else { + execute(request, response, false); + } } @Override @@ -93,62 +85,60 @@ protected void execute(SlingHttpServletRequest request, SlingHttpServletResponse if (StringUtils.isBlank(path)) { throw new Exception("path should be provided"); } - String dryRun = request.getParameter(BasePipe.DRYRUN_KEY); - int size = request.getParameter(PARAM_SIZE) != null ? Integer.parseInt(request.getParameter(PARAM_SIZE)) : NB_MAX; - if (size < 0) { - size = Integer.MAX_VALUE; - } - - ResourceResolver resolver = request.getResourceResolver(); - Resource pipeResource = resolver.getResource(path); - Pipe pipe = plumber.getPipe(pipeResource); - PipeBindings bindings = pipe.getBindings(); - - if (StringUtils.isNotBlank(dryRun) && dryRun.equals(Boolean.TRUE.toString())) { - bindings.addBinding(BasePipe.DRYRUN_KEY, true); - } - - String paramBindings = request.getParameter(PARAM_BINDINGS); - if (StringUtils.isNotBlank(paramBindings)){ - try { - JSONObject bindingJSON = new JSONObject(paramBindings); - for (Iterator keys = bindingJSON.keys(); keys.hasNext();){ - String key = keys.next(); - bindings.addBinding(key, bindingJSON.get(key)); - } - } catch (Exception e){ - log.error("Unable to retrieve bindings information", e); + Map bindings = getBindingsFromRequest(request, writeAllowed); + String asyncParam = request.getParameter(PARAM_ASYNC); + if (StringUtils.isNotBlank(asyncParam) && asyncParam.equals(Boolean.TRUE.toString())){ + Job job = plumber.executeAsync(request.getResourceResolver(), path, bindings); + if (job != null){ + response.getWriter().append("pipe execution registered as " + job.getId()); + response.setStatus(HttpServletResponse.SC_CREATED); + } else { + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Some issue with your request, or server not being ready for async execution"); } + } else { + OutputWriter writer = getWriter(request, response); + plumber.execute(request.getResourceResolver(), path, bindings, writer, true); } - if (!writeAllowed && pipe.modifiesContent()) { - throw new Exception("This pipe modifies content, you should use a POST request"); - } - OutputWriter writer = getWriter(request, response, pipe); - int i = 0; - Iterator resourceIterator = pipe.getOutput(); - Set paths = new HashSet(); - while (resourceIterator.hasNext()){ - Resource resource = resourceIterator.next(); - paths.add(resource.getPath()); - if (i++ < size) { - writer.writeItem(resource); - } - } - writer.ends(i); - plumber.persist(resolver, pipe, paths); } catch (Exception e) { throw new ServletException(e); } } - OutputWriter getWriter(SlingHttpServletRequest request, SlingHttpServletResponse response, Pipe pipe) throws IOException, JSONException { + /** + * Converts request into pipe bindings + * @param request + * @return + */ + protected Map getBindingsFromRequest(SlingHttpServletRequest request, boolean writeAllowed){ + Map bindings = new HashMap<>(); + String dryRun = request.getParameter(BasePipe.DRYRUN_KEY); + if (StringUtils.isNotBlank(dryRun) && dryRun.equals(Boolean.TRUE.toString())) { + bindings.put(BasePipe.DRYRUN_KEY, true); + } + String paramBindings = request.getParameter(PARAM_BINDINGS); + if (StringUtils.isNotBlank(paramBindings)){ + try { + JSONObject bindingJSON = new JSONObject(paramBindings); + for (Iterator keys = bindingJSON.keys(); keys.hasNext();){ + String key = keys.next(); + bindings.put(key, bindingJSON.get(key)); + } + } catch (Exception e){ + log.error("Unable to retrieve bindings information", e); + } + } + bindings.put(BasePipe.READ_ONLY, !writeAllowed); + return bindings; + } + + OutputWriter getWriter(SlingHttpServletRequest request, SlingHttpServletResponse response) throws IOException, JSONException { OutputWriter[] candidates = new OutputWriter[]{new CustomJsonWriter(), new CustomWriter(), new DefaultOutputWriter()}; for (OutputWriter candidate : candidates) { if (candidate.handleRequest(request)) { - candidate.init(request, response, pipe); + candidate.init(request, response); return candidate; } } return null; } -} +} \ No newline at end of file diff --git a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/package-info.java b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/package-info.java index b5e84e40cc..44b5df9d17 100644 --- a/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/package-info.java +++ b/contrib/extensions/sling-pipes/src/main/java/org/apache/sling/pipes/package-info.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("0.0.10") +@Version("1.0.0") package org.apache.sling.pipes; import org.osgi.annotation.versioning.Version; diff --git a/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java b/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java index 08e8047a73..a6a1afb7f9 100644 --- a/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java +++ b/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/AbstractPipeTest.java @@ -30,6 +30,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * this abstract class for pipes implements a plumber with all registered pipes, plus some test ones, and give some paths, @@ -53,7 +55,11 @@ public class AbstractPipeTest { @Before public void setup(){ PlumberImpl plumberImpl = new PlumberImpl(); - plumberImpl.activate(); + PlumberImpl.Configuration configuration = mock(PlumberImpl.Configuration.class); + when(configuration.authorizedUsers()).thenReturn(new String[]{}); + when(configuration.serviceUser()).thenReturn(null); + when(configuration.bufferSize()).thenReturn(PlumberImpl.DEFAULT_BUFFER_SIZE); + plumberImpl.activate(configuration); plumberImpl.registerPipe("slingPipes/dummyNull", DummyNull.class); plumberImpl.registerPipe("slingPipes/dummySearch", DummySearch.class); plumber = plumberImpl; diff --git a/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java b/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java index 5c18229f3b..23d6775632 100644 --- a/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java +++ b/contrib/extensions/sling-pipes/src/test/java/org/apache/sling/pipes/internal/PlumberServletTest.java @@ -205,7 +205,7 @@ public static SlingHttpServletRequest mockPlumberServletRequest(ResourceResolver when(request.getParameter(PlumberServlet.PARAM_BINDINGS)).thenReturn(bindings); when(request.getParameter(CustomWriter.PARAM_WRITER)).thenReturn(writer); when(request.getParameter(BasePipe.DRYRUN_KEY)).thenReturn(dryRun); - when(request.getParameter(PlumberServlet.PARAM_SIZE)).thenReturn(size); + when(request.getParameter(OutputWriter.PARAM_SIZE)).thenReturn(size); return request; }