Skip to content

Commit

Permalink
Merge pull request #1388 from Aklakan/gh-1387
Browse files Browse the repository at this point in the history
GH-1387 Improved custom service executor extension system
  • Loading branch information
afs committed Jul 3, 2022
2 parents 7456469 + b2ed4de commit db4a750
Show file tree
Hide file tree
Showing 20 changed files with 745 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package org.apache.jena.sparql.engine;

import java.util.Iterator ;

import org.apache.jena.atlas.lib.Closeable ;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.sparql.engine.binding.Binding ;
import org.apache.jena.sparql.util.PrintSerializable ;

/** Root of query iterators in ARQ. */

public interface QueryIterator extends Closeable, Iterator<Binding>, PrintSerializable
public interface QueryIterator extends IteratorCloseable<Binding>, PrintSerializable
{
/** Get next binding */
public Binding nextBinding() ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.jena.sparql.expr.ExprList ;
import org.apache.jena.sparql.procedure.ProcEval ;
import org.apache.jena.sparql.procedure.Procedure ;
import org.apache.jena.sparql.service.ServiceExecutorRegistry;

/**
* Turn an Op expression into an execution of QueryIterators.
Expand Down Expand Up @@ -305,7 +306,7 @@ protected QueryIterator execute(OpFilter opFilter, QueryIterator input) {
}

protected QueryIterator execute(OpService opService, QueryIterator input) {
return new QueryIterService(input, opService, execCxt) ;
return ServiceExecutorRegistry.exec(input, opService, execCxt);
}

// Quad form, "GRAPH ?g {}" Flip back to OpGraph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import org.apache.jena.sparql.engine.iterator.QueryIterSingleton;
import org.apache.jena.sparql.engine.main.QC ;
import org.apache.jena.sparql.exec.http.Service;
import org.apache.jena.sparql.service.ServiceExecution;
import org.apache.jena.sparql.service.ServiceExecutorFactory;
import org.apache.jena.sparql.service.ServiceExecutorRegistry;
import org.apache.jena.sparql.service.single.ChainingServiceExecutor;
import org.apache.jena.sparql.util.Context;


/**
* This class continues to exist for compatibility with legacy service extensions.
* New code should register extensions at a {@link ServiceExecutorRegistry}.
*/
@Deprecated(since = "4.6.0")
public class QueryIterService extends QueryIterRepeatApply
{
protected OpService opService ;
Expand All @@ -59,20 +62,21 @@ protected QueryIterator nextStage(Binding outerBinding) {
ExecutionContext execCxt = getExecContext();
Context cxt = execCxt.getContext();
ServiceExecutorRegistry registry = ServiceExecutorRegistry.get(cxt);
ServiceExecution svcExec = null;
QueryIterator svcExec = null;
OpService substitutedOp = (OpService)QC.substitute(opService, outerBinding);

try {
// ---- Find handler
if ( registry != null ) {
for ( ServiceExecutorFactory factory : registry.getFactories() ) {
// FIXME This needs to be updated for chainable executors
for ( ChainingServiceExecutor factory : registry.getSingleChain() ) {
// Internal consistency check
if ( factory == null ) {
Log.warn(this, "SERVICE <" + opService.getService().toString() + ">: Null item in custom ServiceExecutionRegistry");
continue;
}

svcExec = factory.createExecutor(substitutedOp, opService, outerBinding, execCxt);
svcExec = factory.createExecution(substitutedOp, opService, outerBinding, execCxt, null);
if ( svcExec != null )
break;
}
Expand All @@ -81,8 +85,7 @@ protected QueryIterator nextStage(Binding outerBinding) {
// ---- Execute
if ( svcExec == null )
throw new QueryExecException("No SERVICE handler");
QueryIterator qIter = svcExec.exec();
qIter = QueryIter.makeTracked(qIter, getExecContext());
QueryIterator qIter = QueryIter.makeTracked(svcExec, getExecContext());
// Need to put the outerBinding as parent to every binding of the service call.
// There should be no variables in common because of the OpSubstitute.substitute
return new QueryIterCommonParent(qIter, outerBinding, getExecContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.jena.sparql.service;

import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.main.iterator.QueryIterService;

/**
* Execution of a SERVICE clause in the context of {@link QueryIterService} applying an input binding.
* @see ServiceExecutorFactory
/**
* Execution of a SERVICE clause in the context of {@link QueryIterService} applying an input binding.s
* @see ServiceExecutor
* @see ServiceExecutorRegistry
*/
*
* @deprecated Deprecated in favor of QueryIterators that initialize lazily
*/
@Deprecated(since = "4.6.0")
public interface ServiceExecution {
public QueryIterator exec();
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.service.single.ChainingServiceExecutor;
import org.apache.jena.sparql.service.single.ServiceExecutor;

/**
* Interface for custom handling of service execution requests.
*/
/** Compatibility interface. Consider migrating legacy code to {@link ChainingServiceExecutor} or {@link ServiceExecutor} */
@Deprecated(since = "4.6.0")
@FunctionalInterface
public interface ServiceExecutorFactory {
/**
* If this factory cannot handle the execution request then this method should return null.
* Otherwise, a {@link ServiceExecution} with the corresponding {@link QueryIterator} is returned.
*
* @return A QueryIterator if this factory can handle the request, or null otherwise.
*/
public ServiceExecution createExecutor(OpService opExecute, OpService original, Binding binding, ExecutionContext execCxt);
public interface ServiceExecutorFactory
extends ServiceExecutor
{
@Override
default QueryIterator createExecution(OpService opExecute, OpService original, Binding binding, ExecutionContext execCxt) {
ServiceExecution svcExec = createExecutor(opExecute, original, binding, execCxt);
QueryIterator result = svcExec == null ? null : svcExec.exec();
return result;
}

ServiceExecution createExecutor(OpService opExecute, OpService original, Binding binding, ExecutionContext execCxt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,67 @@
package org.apache.jena.sparql.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.algebra.op.OpService;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.service.bulk.ChainingServiceExecutorBulk;
import org.apache.jena.sparql.service.bulk.ServiceExecutorBulk;
import org.apache.jena.sparql.service.bulk.ServiceExecutorBulkOverRegistry;
import org.apache.jena.sparql.service.single.ChainingServiceExecutor;
import org.apache.jena.sparql.service.single.ChainingServiceExecutorWrapper;
import org.apache.jena.sparql.service.single.ServiceExecutor;
import org.apache.jena.sparql.service.single.ServiceExecutorHttp;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.exec.http.*;

/**
* Registry for service executors that can be extended with custom ones.
* Bulk and single (=non-bulk) executors are maintained in two separate lists.
*
* Default execution will always start with the bulk list first.
* Once that list is exhausted by means of all bulk executors having delegated the request,
* then the non-bulk ones will be considered.
* There is no need to explicitly register a bulk-to-non-bulk bridge.
*/
public class ServiceExecutorRegistry
{
// A list of custom service executors which are tried in the given order
List<ServiceExecutorFactory> registry = new ArrayList<>();
// A list of bulk service executors which are tried in the given order
List<ChainingServiceExecutorBulk> bulkChain = new ArrayList<>();

// A list of single (non-bulk) service executors which are tried in the given order
// This list is only considered after after the bulk registry
List<ChainingServiceExecutor> singleChain = new ArrayList<>();

public static ServiceExecutorRegistry standardRegistry()
{
ServiceExecutorRegistry reg = get(ARQ.getContext()) ;
return reg ;
}

/** A "call with SPARQL query" service execution factory. */
public static ServiceExecutorFactory httpService = (op, opx, binding, execCxt) -> ()->Service.exec(op, execCxt.getContext());
/** A "call with SPARQL query" service executor. */
public static ServiceExecutor httpService = new ServiceExecutorHttp();

/** Blindly adds the default executor(s); concretely adds the http executor */
public static void initWithDefaults(ServiceExecutorRegistry registry) {
registry.add(httpService);
}

public static void init() {
// Initialize if there is no registry already set
ServiceExecutorRegistry reg = new ServiceExecutorRegistry() ;
reg.add(httpService);
ServiceExecutorRegistry reg = new ServiceExecutorRegistry();
initWithDefaults(reg);
set(ARQ.getContext(), reg) ;
}

/**
* Return the global instance from the ARQ context; create that instance if needed.
* Never returns null.
*/
public static ServiceExecutorRegistry get()
{
// Initialize if there is no registry already set
Expand All @@ -61,6 +93,16 @@ public static ServiceExecutorRegistry get()
return reg ;
}

/** Return the registry from the given context if present; otherwise return the global one */
public static ServiceExecutorRegistry chooseRegistry(Context context) {
ServiceExecutorRegistry result = ServiceExecutorRegistry.get(context);
if (result == null) {
result = get();
}
return result;
}

/** Return the registry from the given context only; null if there is none */
public static ServiceExecutorRegistry get(Context context)
{
if ( context == null )
Expand All @@ -76,29 +118,102 @@ public static void set(Context context, ServiceExecutorRegistry reg)
public ServiceExecutorRegistry()
{}

/** Create an independent copy of the registry */
public ServiceExecutorRegistry copy() {
ServiceExecutorRegistry result = new ServiceExecutorRegistry();
result.getFactories().addAll(getFactories());
return result;
/*
* Non-bulk API
*/

/** Prepend the given service executor as a link to the per-binding chain */
public ServiceExecutorRegistry addSingleLink(ChainingServiceExecutor f) {
Objects.requireNonNull(f) ;
singleChain.add(0, f) ;
return this;
}

/** Remove the given service executor from the per-binding chain */
public ServiceExecutorRegistry removeSingleLink(ChainingServiceExecutor f) {
singleChain.remove(f) ;
return this;
}

/** Wraps the given service executor as a chaining one and prepends it
* to the non-bulk chain via {@link #addSingleLink(ChainingServiceExecutor)} */
public ServiceExecutorRegistry add(ServiceExecutor f) {
Objects.requireNonNull(f) ;
return addSingleLink(new ChainingServiceExecutorWrapper(f));
}

/** Insert a service executor factory. Must not be null. */
public ServiceExecutorRegistry add(ServiceExecutorFactory f) {
/** Remove a given service executor - internally attempts to unwrap every chaining service executor */
public ServiceExecutorRegistry remove(ServiceExecutor f) {
Iterator<ChainingServiceExecutor> it = singleChain.iterator();
while (it.hasNext()) {
ChainingServiceExecutor cse = it.next();
if (cse instanceof ChainingServiceExecutorWrapper) {
ChainingServiceExecutorWrapper wrapper = (ChainingServiceExecutorWrapper)cse;
ServiceExecutor delegate = wrapper.getDelegate();
if (Objects.equals(delegate, f)) {
it.remove();
}
}
}
return this;
}

/** Retrieve the actual list of per-binding executors; allows for re-ordering */
public List<ChainingServiceExecutor> getSingleChain() {
return singleChain;
}

/*
* Bulk API
*/

/** Add a chaining bulk executor as a link to the executor chain */
public ServiceExecutorRegistry addBulkLink(ChainingServiceExecutorBulk f) {
Objects.requireNonNull(f) ;
registry.add(0, f) ;
bulkChain.add(0, f) ;
return this;
}

/** Remove the given service executor factory. */
public ServiceExecutorRegistry remove(ServiceExecutorFactory f) {
registry.remove(f) ;
/** Remove the given service executor */
public ServiceExecutorRegistry removeBulkLink(ChainingServiceExecutorBulk f) {
bulkChain.remove(f) ;
return this;
}

/** Retrieve the actual list of factories; allows for re-ordering */
public List<ServiceExecutorFactory> getFactories() {
return registry;
/** Retrieve the actual list of bulk executors; allows for re-ordering */
public List<ChainingServiceExecutorBulk> getBulkChain() {
return bulkChain;
}

}
/*
* Utility
*/

/** Create an independent copy of the registry */
public ServiceExecutorRegistry copy() {
ServiceExecutorRegistry result = new ServiceExecutorRegistry();
result.getSingleChain().addAll(getSingleChain());
result.getBulkChain().addAll(getBulkChain());
return result;
}

/** Return a copy of the registry in the context (if present) or a fresh instance */
public ServiceExecutorRegistry copyFrom(Context cxt) {
ServiceExecutorRegistry tmp = ServiceExecutorRegistry.get(cxt);
ServiceExecutorRegistry result = tmp == null ? new ServiceExecutorRegistry() : tmp.copy();
return result;
}

/*
* Execution
*/

/** Execute an OpService w.r.t. the execCxt's service executor registry */
public static QueryIterator exec(QueryIterator input, OpService opService, ExecutionContext execCxt) {
Context cxt = execCxt.getContext();
ServiceExecutorRegistry registry = ServiceExecutorRegistry.chooseRegistry(cxt);
ServiceExecutorBulk serviceExecutor = new ServiceExecutorBulkOverRegistry(registry);
QueryIterator qIter = serviceExecutor.createExecution(opService, input, execCxt);
return qIter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.jena.sparql.service.bulk;

import org.apache.jena.sparql.algebra.op.OpService;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;

/** Interface for custom service execution extensions that handle
* the iterator over the input bindings themselves */
public interface ChainingServiceExecutorBulk {
/**
* If this executor cannot handle the createExecution request then it should delegate
* to the chain's @{code createExecution} method and return its result.
* In any case, a {@link QueryIterator} needs to be returned.
*
* @return A non-null {@link QueryIterator} for the execution of the given OpService expression.
*/
public QueryIterator createExecution(OpService opService, QueryIterator input, ExecutionContext execCxt, ServiceExecutorBulk chain);
}

0 comments on commit db4a750

Please sign in to comment.