Skip to content

Commit

Permalink
Improve Lyo Store for use in the persistent scenario (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
berezovskyi committed Jun 7, 2021
1 parent 37147dc commit c9a326f
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Added

- Store adds interface support for closing the Store cleanly and releasing underlying connections.
- SparqlStoreImpl can now be set up with a custom query executor

### Changed

- Update SHACLEX from 0.0.87 to 0.1.93 (breaking change but should not affect the consumers of Lyo Validation)
Expand Down
15 changes: 11 additions & 4 deletions store/store-core/src/main/java/org/eclipse/lyo/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ <T extends IResource> List<T> getResources(URI namedGraphUri, Class<T> clazz, in
* @throws ModelUnmarshallingException if the classes cannot be instantiated or another error
* occurred when working with Jena model.
*/
<T extends IResource> List<T> getResources(URI namedGraphUri, Class<T> clazz,
String prefixes, String where, String searchTerms,
<T extends IResource> List<T> getResources(URI namedGraphUri, Class<T> clazz,
String prefixes, String where, String searchTerms,
int limit, int offset) throws StoreAccessException, ModelUnmarshallingException;

/**
* Retrieve a Jena model that satisfies the given where parameter as defined in the OSLC Query language (https://tools.oasis-open.org/version-control/svn/oslc-core/trunk/specs/oslc-query.html)
* Retrieve a Jena model that satisfies the given where parameter as defined in the OSLC Query language (https://tools.oasis-open.org/version-control/svn/oslc-core/trunk/specs/oslc-query.html)
* If the namedGraph is null, the query is applied on all namedGraph in the triplestore.
* The method currently only provides support for terms of type Comparisons, where the operator is 'EQUALS', and the operand is either a String or a URI.
*
Expand All @@ -187,7 +187,7 @@ <T extends IResource> List<T> getResources(URI namedGraphUri, Class<T> clazz,
Model getResources(URI namedGraph, String prefixes, String where, int limit, int offset);

/**
* Retrieve a Jena model that satisfies the given where parameter as defined in the OSLC Query language (https://tools.oasis-open.org/version-control/svn/oslc-core/trunk/specs/oslc-query.html)
* Retrieve a Jena model that satisfies the given where parameter as defined in the OSLC Query language (https://tools.oasis-open.org/version-control/svn/oslc-core/trunk/specs/oslc-query.html)
* If the namedGraph is null, the query is applied on all namedGraph in the triplestore.
* The method currently only provides support for terms of type Comparisons, where the operator is 'EQUALS', and the operand is either a String or a URI.
*
Expand Down Expand Up @@ -312,4 +312,11 @@ default <T extends IResource> boolean appendResource(URI namedGraphUri, final T
* @since 0.23.0
*/
void removeAll();

/**
* Close connection
*
* @since 4.1.0
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ public void removeAll() {
}
}

@Override
public void close() {
TDB.sync(dataset);
dataset.close();
}

@Override
public <T extends IResource> T getResource(final URI namedGraph, final URI uri,
final Class<T> clazz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public <T extends IResource> List<T> getResources(final URI namedGraph, final Cl
public Model getResources(final URI namedGraph, final String prefixes, final String where, final int limit, final int offset) {
return getResources(namedGraph, prefixes, where, null, limit, offset);
}

@Override
public Model getResources(final URI namedGraph, final String prefixes, final String where, final String searchTerms, final int limit, final int offset) {

Expand Down Expand Up @@ -305,7 +305,7 @@ public Model getResources(final URI namedGraph, final String prefixes, final Str
Query describeQuery = describeBuilder.build() ;
String describeQueryString = describeQuery.toString();
final QueryExecution queryExecution = queryExecutor.prepareSparqlQuery(describeQueryString);

Model execDescribe;
try {
execDescribe = queryExecution.execDescribe();
Expand Down Expand Up @@ -377,6 +377,12 @@ public void removeAll() {
queryExecutor.prepareSparqlUpdate("CLEAR ALL").execute();
}

@Override
public void close() {
queryExecutor.release();
log.debug("Underlying SPARQL connection has been released");
}

private <T extends IResource> String oslcQueryPrefixes(final Class<T> clazz) {
return "rdf=" + "<" + org.apache.jena.vocabulary.RDF.uri + ">";
}
Expand Down Expand Up @@ -454,7 +460,7 @@ private Model modelFromQueryByUri(final URI namedGraph, final URI uri) {
throw e;
}
return execDescribe;

}

private Model modelFromQueryFlatPaged(final URI namedGraph, final URI type, final int limit,
Expand Down Expand Up @@ -528,15 +534,15 @@ private SelectBuilder constructSparqlWhere(final String prefixes, final String w
SimpleTerm simpleTerm = iterator.next();
Type termType = simpleTerm.type();
PName property = simpleTerm.property();

if (!termType.equals(Type.COMPARISON)){
throw new UnsupportedOperationException("only support for terms of type Comparisons");
}
ComparisonTerm aComparisonTerm = (ComparisonTerm) simpleTerm;
if (!aComparisonTerm.operator().equals(Operator.EQUALS)){
throw new UnsupportedOperationException("only support for terms of type Comparisons, where the operator is 'EQUALS'");
}

Value comparisonOperand = aComparisonTerm.operand();
Value.Type operandType = comparisonOperand.type();
String predicate;
Expand All @@ -546,7 +552,7 @@ private SelectBuilder constructSparqlWhere(final String prefixes, final String w
else {
predicate = property.toString();
}

switch (operandType) {
case DECIMAL:
DecimalValue decimalOperand = (DecimalValue) comparisonOperand;
Expand All @@ -568,22 +574,22 @@ private SelectBuilder constructSparqlWhere(final String prefixes, final String w
} catch (ParseException e) {
throw new IllegalArgumentException("whereExpression could not be parsed", e);
}

//Setup searchTerms
//Add a sparql filter "FILTER regex(?o, "<searchTerms>", "i")" to the distinctResourcesQuery
if (!StringUtils.isEmpty(searchTerms)) {
ExprFactory factory = new ExprFactory();
E_Regex regex = factory.regex(factory.str("?o"), searchTerms, "i");
distinctResourcesQuery.addFilter(regex);
}

if (limit > 0) {
distinctResourcesQuery.setLimit(limit);
}
if (offset > 0) {
distinctResourcesQuery.setOffset(offset);
}

SelectBuilder constructSelectQuery = new SelectBuilder();
constructSelectQuery.addVar( "s p o" )
.addSubQuery(distinctResourcesQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.jena.query.Dataset;
import org.apache.jena.query.QueryExecution;
import org.apache.jena.query.QueryExecutionFactory;
import org.apache.jena.tdb.TDB;
import org.apache.jena.tdb.TDBFactory;
import org.apache.jena.update.GraphStore;
import org.apache.jena.update.GraphStoreFactory;
Expand All @@ -40,7 +41,7 @@
public class DatasetQueryExecutorImpl implements JenaQueryExecutor {
private static final Logger log = LoggerFactory.getLogger(DatasetQueryExecutorImpl.class);
private final Dataset dataset;
private final GraphStore graphStore;
private volatile boolean released = false;

/**
* Use {@link StoreFactory} instead.
Expand All @@ -49,21 +50,33 @@ public DatasetQueryExecutorImpl() {
this(TDBFactory.createDataset());
}

DatasetQueryExecutorImpl(final Dataset dataset) {
public DatasetQueryExecutorImpl(final Dataset dataset) {
this.dataset = dataset;
this.graphStore = GraphStoreFactory.create(dataset);
}

@Override
public QueryExecution prepareSparqlQuery(final String query) {
if(released) {
throw new IllegalStateException("Cannot execute queries after releasing the connection");
}
log.debug("Running query: '{}'", query);
return QueryExecutionFactory.create(query, dataset);
}

@Override
public UpdateProcessor prepareSparqlUpdate(final String query) {
if(released) {
throw new IllegalStateException("Cannot execute queries after releasing the connection");
}
log.debug("Running update: '{}'", query);
final UpdateRequest update = UpdateFactory.create(query);
return UpdateExecutionFactory.create(update, graphStore);
return UpdateExecutionFactory.create(update, dataset);
}

@Override
public void release() {
TDB.sync(dataset);
released = true;
dataset.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public interface JenaQueryExecutor {
* @return prepared processor
*/
UpdateProcessor prepareSparqlUpdate(String query);

/**
* Release a connection to the underlying engine
*/
void release();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.jena.update.UpdateExecutionFactory;
import org.apache.jena.update.UpdateFactory;
import org.apache.jena.update.UpdateProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* SparqlQueryExecutorImpl is a SPARQL endpoint-based implementation of {@link JenaQueryExecutor}.
Expand All @@ -34,10 +38,12 @@
* @since 0.14.0
*/
public class SparqlQueryExecutorBasicAuthImpl implements JenaQueryExecutor {
private final Logger log = LoggerFactory.getLogger(SparqlQueryExecutorBasicAuthImpl.class);

private final String queryEndpoint;
private final String updateEndpoint;
private final CloseableHttpClient client;
private volatile boolean released = false;

public SparqlQueryExecutorBasicAuthImpl(final String sparqlEndpoint,
final String updateEndpoint, final String login, final String password) {
Expand All @@ -52,15 +58,32 @@ public SparqlQueryExecutorBasicAuthImpl(final String sparqlEndpoint,

@Override
public QueryExecution prepareSparqlQuery(final String query) {
if (released) {
throw new IllegalStateException("Cannot execute queries after releasing the connection");
}
return QueryExecutionFactory.sparqlService(queryEndpoint, query, client);
}

@Override
public UpdateProcessor prepareSparqlUpdate(final String query) {
if (released) {
throw new IllegalStateException("Cannot execute queries after releasing the connection");
}
return UpdateExecutionFactory.createRemote(
UpdateFactory.create(query),
updateEndpoint,
client
UpdateFactory.create(query),
updateEndpoint,
client
);
}

@Override
public void release() {
try {
released = true;
client.close();
} catch (IOException e) {
log.warn("Failed to close the HTTP client cleanly");
log.debug("Failed to close the HTTP client cleanly", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.jena.update.UpdateExecutionFactory;
import org.apache.jena.update.UpdateFactory;
import org.apache.jena.update.UpdateProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SparqlQueryExecutorImpl is a SPARQL endpoint-based implementation of {@link JenaQueryExecutor}.
Expand All @@ -28,6 +30,7 @@
* @since 0.14.0
*/
public class SparqlQueryExecutorImpl implements JenaQueryExecutor {
private final Logger log = LoggerFactory.getLogger(SparqlQueryExecutorImpl.class);

private final String queryEndpoint;
private final String updateEndpoint;
Expand All @@ -46,4 +49,9 @@ public QueryExecution prepareSparqlQuery(final String query) {
public UpdateProcessor prepareSparqlUpdate(final String query) {
return UpdateExecutionFactory.createRemote(UpdateFactory.create(query), updateEndpoint);
}

@Override
public void release() {
log.trace("NOP, there is nothing to release");
}
}

0 comments on commit c9a326f

Please sign in to comment.