Skip to content
Browse files

rfe9318: Support session-based (rather than persistent) federation

<release-note>
rfe9318: Support session-based (rather than persistent) federation

The methods in AGServer for creating and deleting federations have
been replaced by a single method, federate, which takes any number of
repository objects, and returns a 'virtual' repository that federates
all of them.
</release-note>

<documentation>
I filed rfe9383 for Bruce to update the documentation.
</documentation>
  • Loading branch information...
1 parent 3c60b65 commit 369fcf0c66a0ebcd94a0b8b7b72ed6c1e0960454 @marijnh marijnh committed Mar 2, 2010
View
12 clojure/src/com/franz/agraph.clj
@@ -91,12 +91,12 @@
(defn repo-federation
"rcons: may be of type AGRepository or AGRepositoryConnection"
- [#^AGServer server repo-name rcons rcon-args]
- (-> (.createFederation server repo-name
- (into-array AGRepository (map #(cond (instance? AGRepository %) %
- (nil? %) nil
- :else (.getRepository #^AGRepositoryConnection %))
- rcons)))
+ [#^AGServer server rcons rcon-args]
+ (-> (.federate server
+ (into-array AGRepository (map #(cond (instance? AGRepository %) %
+ (nil? %) nil
+ :else (.getRepository #^AGRepositoryConnection %))
+ rcons)))
open repo-init (repo-connection rcon-args)))
(defn ag-server
View
3 clojure/test/com/franz/agraph/agtest.clj
@@ -211,8 +211,7 @@
;; create two ordinary stores, and one federated store:
red-con (ag-repo-con cat "redthings" rcon-args)
green-con (ag-repo-con cat "greenthings" rcon-args)
- rainbow-con (repo-federation server "rainbowthings"
- [red-con green-con] rcon-args)
+ rainbow-con (repo-federation server [red-con green-con] rcon-args)
rf (value-factory red-con)
gf (value-factory green-con)
rbf (value-factory rainbow-con)]
View
1 clojure/tutorial/com/franz/agraph/tutorial.clj
@@ -382,7 +382,6 @@
green-rep (repo-init (repository cat "greenthings" :renew))
green-con (repo-connection green-rep rcon-args)
rainbow-rep (repo-init (repo-federation server
- "rainbowthings"
red-rep green-rep))
rainbow-con (repo-connection rainbow-rep rcon-args)
rf (value-factory red-con)
View
19 src/com/franz/agraph/http/AGHTTPClient.java
@@ -336,6 +336,25 @@ public TupleQueryResult getTupleQueryResult(String url) throws RepositoryExcepti
return handler.getString().split("\n");
}
+ public String openSession(String spec, boolean autocommit) throws RepositoryException {
+ String url = AGProtocol.getSessionURL(serverURL);
+ Header[] headers = new Header[0];
+ NameValuePair[] data = { new NameValuePair("store", spec),
+ new NameValuePair(AGProtocol.AUTOCOMMIT_PARAM_NAME,
+ Boolean.toString(autocommit)),
+ new NameValuePair(AGProtocol.LIFETIME_PARAM_NAME,
+ Long.toString(3600)) }; // TODO have some kind of policy for this
+ AGResponseHandler handler = new AGResponseHandler("");
+ try {
+ post(url, headers, data, null, handler);
+ } catch (HttpException e) {
+ throw new RepositoryException(e);
+ } catch (IOException e) {
+ throw new RepositoryException(e);
+ } catch (RDFParseException e) {}
+ return handler.getString();
+ }
+
@Override
public void close() {
Util.close(this.mManager);
View
159 src/com/franz/agraph/http/AGHttpRepoClient.java
@@ -52,43 +52,44 @@
import org.openrdf.rio.RDFHandlerException;
import org.openrdf.rio.RDFParseException;
import org.openrdf.rio.ntriples.NTriplesUtil;
+import org.openrdf.repository.Repository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.franz.agraph.repository.AGRepositoryConnection;
import com.franz.util.Closeable;
/**
* TODO: rename this class.
*/
public class AGHttpRepoClient implements Closeable {
- private final AGRepositoryConnection repoconnection;
final Logger logger = LoggerFactory.getLogger(this.getClass());
// delay using a dedicated session until necessary
private boolean usingDedicatedSession = false;
private boolean autoCommit = true;
- private String sessionRoot;
+ private String sessionRoot, repoRoot;
// TODO: choose proper defaults
private long lifetimeInSeconds = 3600;
private TupleQueryResultFormat preferredTQRFormat = TupleQueryResultFormat.SPARQL;
private BooleanQueryResultFormat preferredBQRFormat = BooleanQueryResultFormat.TEXT;
private RDFFormat preferredRDFFormat = RDFFormat.TRIX;
- public AGHttpRepoClient(AGRepositoryConnection repoconnection)
- throws RepositoryException {
- this.repoconnection = repoconnection;
- sessionRoot = getRepositoryURL();
- }
+ private AGHTTPClient client;
+ private Repository repo;
- public AGRepositoryConnection getRepositoryConnection() {
- return repoconnection;
+ public AGHttpRepoClient(Repository repo, AGHTTPClient client, String repoRoot, String sessionRoot) {
+ this.repo = repo;
+ this.sessionRoot = sessionRoot;
+ this.repoRoot = repoRoot;
+ this.client = client;
}
- public String getSessionRoot() {
- return sessionRoot;
+ public String getRoot() throws RepositoryException {
+ if (sessionRoot != null) return sessionRoot;
+ else if (repoRoot != null) return repoRoot;
+ else throw new RepositoryException("This session-only connection has been closed. Re-open a new one to start using it again.");
}
public TupleQueryResultFormat getPreferredTQRFormat() {
@@ -117,22 +118,17 @@ public void setPreferredRDFFormat(RDFFormat preferredRDFFormat) {
}
public String getServerURL() {
- return getRepositoryConnection().getRepository().getCatalog()
- .getServer().getServerURL();
- }
-
- public String getRepositoryURL() {
- return getRepositoryConnection().getRepository().getRepositoryURL();
+ return client.getServerURL();
}
public AGHTTPClient getHTTPClient() {
- return getRepositoryConnection().getRepository().getHTTPClient();
+ return client;
}
private void useDedicatedSession(boolean autoCommit)
throws RepositoryException, UnauthorizedException {
- if (!usingDedicatedSession) {
- String url = getSessionURL(getRepositoryURL());
+ if (sessionRoot == null) {
+ String url = getSessionURL(getRoot());
Header[] headers = new Header[0];
NameValuePair[] data = {
new NameValuePair(AGProtocol.LIFETIME_PARAM_NAME, Long
@@ -153,33 +149,33 @@ private void useDedicatedSession(boolean autoCommit)
if (logger.isDebugEnabled())
logger.debug("openSession: {}", sessionRoot);
sessionRoot = handler.getString();
- usingDedicatedSession = true;
}
}
private void closeSession(String sessionRoot) throws IOException,
RepositoryException, UnauthorizedException {
- String url = AGProtocol.getSessionCloseLocation(sessionRoot);
- Header[] headers = new Header[0];
- NameValuePair[] params = new NameValuePair[0];
- try {
- getHTTPClient().post(url, headers, params, null, null);
- if (logger.isDebugEnabled())
- logger.debug("closeSession: {}", url);
- } catch (RDFParseException e) {
- // bug.
- throw new RuntimeException(e);
- } finally {
- usingDedicatedSession = false;
- this.sessionRoot = getRepositoryURL();
+ if (sessionRoot != null) {
+ String url = AGProtocol.getSessionCloseLocation(sessionRoot);
+ Header[] headers = new Header[0];
+ NameValuePair[] params = new NameValuePair[0];
+ try {
+ getHTTPClient().post(url, headers, params, null, null);
+ if (logger.isDebugEnabled())
+ logger.debug("closeSession: {}", url);
+ } catch (RDFParseException e) {
+ // bug.
+ throw new RuntimeException(e);
+ } finally {
+ sessionRoot = null;
+ }
}
}
public void getStatements(Resource subj, URI pred, Value obj,
boolean includeInferred, RDFHandler handler, Resource... contexts)
throws IOException, RDFHandlerException, RepositoryException,
UnauthorizedException {
- String uri = Protocol.getStatementsLocation(getSessionRoot());
+ String uri = Protocol.getStatementsLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredRDFFormat().getDefaultMIMEType()) };
@@ -209,9 +205,7 @@ public void getStatements(Resource subj, URI pred, Value obj,
uri,
headers,
params.toArray(new NameValuePair[params.size()]),
- new AGResponseHandler(getRepositoryConnection()
- .getRepository(), handler,
- getPreferredRDFFormat()));
+ new AGResponseHandler(repo, handler, getPreferredRDFFormat()));
} catch (AGHttpException e) {
throw new RepositoryException(e);
}
@@ -220,7 +214,7 @@ public void getStatements(Resource subj, URI pred, Value obj,
public void addStatements(Resource subj, URI pred, Value obj,
Resource... contexts) throws IOException, RepositoryException,
UnauthorizedException {
- String uri = Protocol.getStatementsLocation(getSessionRoot());
+ String uri = Protocol.getStatementsLocation(getRoot());
Header[] headers = { new Header("Content-Type", RDFFormat.NTRIPLES
.getDefaultMIMEType()) };
List<NameValuePair> params = new ArrayList<NameValuePair>(5);
@@ -251,7 +245,7 @@ public void addStatements(Resource subj, URI pred, Value obj,
public void deleteStatements(Resource subj, URI pred, Value obj,
Resource... contexts) throws RepositoryException {
- String url = Protocol.getStatementsLocation(getSessionRoot());
+ String url = Protocol.getStatementsLocation(getRoot());
Header[] headers = {};
List<NameValuePair> params = new ArrayList<NameValuePair>(5);
@@ -285,7 +279,7 @@ public void setAutoCommit(boolean autoCommit) throws RepositoryException {
if (!autoCommit) {
useDedicatedSession(false);
}
- String url = AGProtocol.getAutoCommitLocation(getSessionRoot());
+ String url = AGProtocol.getAutoCommitLocation(getRoot());
Header[] headers = {};
NameValuePair[] params = { new NameValuePair(AGProtocol.ON_PARAM_NAME,
Boolean.toString(autoCommit)) };
@@ -306,7 +300,7 @@ public boolean isAutoCommit() throws RepositoryException {
}
public void commit() throws RepositoryException {
- String url = getSessionRoot() + "/" + AGProtocol.COMMIT;
+ String url = getRoot() + "/" + AGProtocol.COMMIT;
Header[] headers = {};
try {
getHTTPClient().post(url, headers, new NameValuePair[0],
@@ -322,7 +316,7 @@ public void commit() throws RepositoryException {
}
public void rollback() throws RepositoryException {
- String url = getSessionRoot() + "/" + AGProtocol.ROLLBACK;
+ String url = getRoot() + "/" + AGProtocol.ROLLBACK;
Header[] headers = {};
try {
getHTTPClient().post(url, headers, new NameValuePair[0],
@@ -338,7 +332,7 @@ public void rollback() throws RepositoryException {
}
public void clearNamespaces() throws RepositoryException {
- String url = Protocol.getNamespacesLocation(getSessionRoot());
+ String url = Protocol.getNamespacesLocation(getRoot());
Header[] headers = {};
try {
getHTTPClient().delete(url, headers, new NameValuePair[0]);
@@ -393,13 +387,13 @@ public void upload(InputStream contents, String baseURI,
public void uploadJSON(JSONArray rows, Resource... contexts)
throws RepositoryException {
- String url = Protocol.getStatementsLocation(getSessionRoot());
+ String url = Protocol.getStatementsLocation(getRoot());
uploadJSON(url, rows, contexts);
}
public void deleteJSON(JSONArray rows, Resource... contexts)
throws RepositoryException {
- uploadJSON(AGProtocol.getStatementsDeleteLocation(getSessionRoot()), rows, contexts);
+ uploadJSON(AGProtocol.getStatementsDeleteLocation(getRoot()), rows, contexts);
}
public void uploadJSON(String url, JSONArray rows, Resource... contexts)
@@ -438,7 +432,7 @@ public void upload(RequestEntity reqEntity, String baseURI,
boolean overwrite, String serverSideFile, URI serverSideURL,
RDFFormat dataFormat, Resource... contexts) throws IOException,
RDFParseException, RepositoryException, UnauthorizedException {
- String url = Protocol.getStatementsLocation(getSessionRoot());
+ String url = Protocol.getStatementsLocation(getRoot());
upload(url, reqEntity, baseURI, overwrite, serverSideFile, serverSideURL, dataFormat, contexts);
}
@@ -496,24 +490,23 @@ public TupleQueryResult getContextIDs() throws IOException,
public void getContextIDs(TupleQueryResultHandler handler)
throws IOException, TupleQueryResultHandlerException,
RepositoryException, UnauthorizedException {
- String url = Protocol.getContextsLocation(getSessionRoot());
+ String url = Protocol.getContextsLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredTQRFormat().getDefaultMIMEType()) };
try {
getHTTPClient().get(
url,
headers,
new NameValuePair[0],
- new AGResponseHandler(getRepositoryConnection()
- .getRepository(), handler));
+ new AGResponseHandler(repo, handler));
} catch (AGHttpException e) {
throw new RepositoryException(e);
}
}
public long size(Resource... contexts) throws IOException,
RepositoryException, UnauthorizedException {
- String url = Protocol.getSizeLocation(getSessionRoot());
+ String url = Protocol.getSizeLocation(getRoot());
Header[] headers = {};
String[] encodedContexts = Protocol.encodeContexts(contexts);
NameValuePair[] contextParams = new NameValuePair[encodedContexts.length];
@@ -545,24 +538,23 @@ public TupleQueryResult getNamespaces() throws IOException,
public void getNamespaces(TupleQueryResultHandler handler)
throws IOException, TupleQueryResultHandlerException,
RepositoryException, UnauthorizedException {
- String url = Protocol.getNamespacesLocation(getSessionRoot());
+ String url = Protocol.getNamespacesLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredTQRFormat().getDefaultMIMEType()) };
try {
getHTTPClient().get(
url,
headers,
new NameValuePair[0],
- new AGResponseHandler(getRepositoryConnection()
- .getRepository(), handler));
+ new AGResponseHandler(repo, handler));
} catch (AGHttpException e) {
throw new RepositoryException(e);
}
}
public String getNamespace(String prefix) throws IOException,
RepositoryException, UnauthorizedException {
- String url = Protocol.getNamespacePrefixLocation(getSessionRoot(),
+ String url = Protocol.getNamespacePrefixLocation(getRoot(),
prefix);
Header[] headers = {};
AGResponseHandler handler = new AGResponseHandler("");
@@ -578,7 +570,7 @@ public String getNamespace(String prefix) throws IOException,
public void setNamespacePrefix(String prefix, String name)
throws RepositoryException {
- String url = Protocol.getNamespacePrefixLocation(getSessionRoot(),
+ String url = Protocol.getNamespacePrefixLocation(getRoot(),
prefix);
Header[] headers = {};
NameValuePair[] params = {};
@@ -595,7 +587,7 @@ public void setNamespacePrefix(String prefix, String name)
}
public void removeNamespacePrefix(String prefix) throws RepositoryException {
- String url = Protocol.getNamespacePrefixLocation(getSessionRoot(),
+ String url = Protocol.getNamespacePrefixLocation(getRoot(),
prefix);
Header[] headers = {};
try {
@@ -611,7 +603,7 @@ public void query(AGResponseHandler handler, QueryLanguage ql,
String query, Dataset dataset, boolean includeInferred,
String planner, Binding... bindings) throws HttpException,
RepositoryException, RDFParseException, IOException {
- String url = getSessionRoot();
+ String url = getRoot();
List<Header> headers = new ArrayList<Header>(5);
headers.add(new Header("Content-Type", Protocol.FORM_MIME_TYPE
+ "; charset=utf-8"));
@@ -673,9 +665,10 @@ public void query(AGResponseHandler handler, QueryLanguage ql,
}
public void close() throws RepositoryException {
- if (usingDedicatedSession) {
+ if (sessionRoot != null) {
try {
closeSession(sessionRoot);
+ sessionRoot = null;
} catch (IOException e) {
throw new RepositoryException(e);
}
@@ -684,7 +677,7 @@ public void close() throws RepositoryException {
public void createFreetextIndex(String name, URI[] predicates)
throws RepositoryException {
- String url = AGProtocol.getFreetextIndexLocation(getSessionRoot(), name);
+ String url = AGProtocol.getFreetextIndexLocation(getRoot(), name);
List<NameValuePair> params = new ArrayList<NameValuePair>();
if (predicates != null) {
for (URI predicate : predicates) {
@@ -705,7 +698,7 @@ public void createFreetextIndex(String name, URI[] predicates)
public String[] getFreetextPredicates(String index)
throws RepositoryException {
- String url = AGProtocol.getFreetextIndexLocation(getSessionRoot(), index) + "/predicates";
+ String url = AGProtocol.getFreetextIndexLocation(getRoot(), index) + "/predicates";
AGResponseHandler handler = new AGResponseHandler("");
try {
getHTTPClient().get(url, new Header[0], new NameValuePair[0], handler);
@@ -719,7 +712,7 @@ public void createFreetextIndex(String name, URI[] predicates)
public String[] getFreetextIndices()
throws RepositoryException {
- String url = AGProtocol.getFreetextIndexLocation(getSessionRoot());
+ String url = AGProtocol.getFreetextIndexLocation(getRoot());
AGResponseHandler handler = new AGResponseHandler("");
try {
getHTTPClient().get(url, new Header[0], new NameValuePair[0], handler);
@@ -733,7 +726,7 @@ public void createFreetextIndex(String name, URI[] predicates)
public void registerPredicateMapping(URI predicate, URI primitiveType)
throws RepositoryException {
- String url = AGProtocol.getPredicateMappingLocation(getSessionRoot());
+ String url = AGProtocol.getPredicateMappingLocation(getRoot());
String pred_nt = NTriplesUtil.toNTriplesString(predicate);
String primtype_nt = NTriplesUtil.toNTriplesString(primitiveType);
Header[] headers = {};
@@ -754,7 +747,7 @@ public void registerPredicateMapping(URI predicate, URI primitiveType)
public void deletePredicateMapping(URI predicate)
throws RepositoryException {
- String url = AGProtocol.getPredicateMappingLocation(getSessionRoot());
+ String url = AGProtocol.getPredicateMappingLocation(getRoot());
String pred_nt = NTriplesUtil.toNTriplesString(predicate);
Header[] headers = {};
NameValuePair[] params = { new NameValuePair(
@@ -769,7 +762,7 @@ public void deletePredicateMapping(URI predicate)
}
public String[] getPredicateMappings() throws RepositoryException {
- String url = AGProtocol.getPredicateMappingLocation(getSessionRoot());
+ String url = AGProtocol.getPredicateMappingLocation(getRoot());
Header[] headers = new Header[0];
NameValuePair[] params = new NameValuePair[0];
AGResponseHandler handler = new AGResponseHandler("");
@@ -785,7 +778,7 @@ public void deletePredicateMapping(URI predicate)
public void registerDatatypeMapping(URI datatype, URI primitiveType)
throws RepositoryException {
- String url = AGProtocol.getDatatypeMappingLocation(getSessionRoot());
+ String url = AGProtocol.getDatatypeMappingLocation(getRoot());
String datatype_nt = NTriplesUtil.toNTriplesString(datatype);
String primtype_nt = NTriplesUtil.toNTriplesString(primitiveType);
Header[] headers = {};
@@ -805,7 +798,7 @@ public void registerDatatypeMapping(URI datatype, URI primitiveType)
}
public void deleteDatatypeMapping(URI datatype) throws RepositoryException {
- String url = AGProtocol.getDatatypeMappingLocation(getSessionRoot());
+ String url = AGProtocol.getDatatypeMappingLocation(getRoot());
String datatype_nt = NTriplesUtil.toNTriplesString(datatype);
Header[] headers = {};
NameValuePair[] params = { new NameValuePair(
@@ -820,7 +813,7 @@ public void deleteDatatypeMapping(URI datatype) throws RepositoryException {
}
public String[] getDatatypeMappings() throws RepositoryException {
- String url = AGProtocol.getPredicateMappingLocation(getSessionRoot());
+ String url = AGProtocol.getPredicateMappingLocation(getRoot());
Header[] headers = new Header[0];
NameValuePair[] params = new NameValuePair[0];
AGResponseHandler handler = new AGResponseHandler("");
@@ -835,7 +828,7 @@ public void deleteDatatypeMapping(URI datatype) throws RepositoryException {
}
public void clearMappings() throws RepositoryException {
- String url = AGProtocol.getMappingLocation(getSessionRoot());
+ String url = AGProtocol.getMappingLocation(getRoot());
Header[] headers = {};
NameValuePair[] params = {};
try {
@@ -859,7 +852,7 @@ public void addRules(String rules) throws RepositoryException {
public void addRules(InputStream rulestream) throws RepositoryException {
useDedicatedSession(isAutoCommit());
- String url = AGProtocol.getFunctorLocation(getSessionRoot());
+ String url = AGProtocol.getFunctorLocation(getRoot());
Header[] headers = {};
NameValuePair[] params = {};
try {
@@ -888,7 +881,7 @@ public String evalInServer(String lispForm) throws RepositoryException {
}
public String evalInServer(InputStream stream) throws RepositoryException {
- String url = AGProtocol.getEvalLocation(getSessionRoot());
+ String url = AGProtocol.getEvalLocation(getRoot());
Header[] headers = {};
NameValuePair[] params = {};
AGResponseHandler handler = new AGResponseHandler("");
@@ -906,7 +899,7 @@ public String evalInServer(InputStream stream) throws RepositoryException {
public void ping() throws RepositoryException {
if (usingDedicatedSession) {
- String url = AGProtocol.getSessionPingLocation(getSessionRoot());
+ String url = AGProtocol.getSessionPingLocation(getRoot());
Header[] headers = {};
try {
getHTTPClient().get(url, headers, new NameValuePair[0], null);
@@ -921,7 +914,7 @@ public void ping() throws RepositoryException {
}
public String[] getGeoTypes() throws RepositoryException {
- String url = AGProtocol.getGeoTypesLocation(getSessionRoot());
+ String url = AGProtocol.getGeoTypesLocation(getRoot());
Header[] headers = {};
AGResponseHandler handler = new AGResponseHandler("");
try {
@@ -938,7 +931,7 @@ public void ping() throws RepositoryException {
public String registerCartesianType(float stripWidth, float xmin, float xmax,
float ymin, float ymax) throws RepositoryException {
- String url = AGProtocol.getGeoTypesCartesianLocation(getSessionRoot());
+ String url = AGProtocol.getGeoTypesCartesianLocation(getRoot());
Header[] headers = {};
NameValuePair[] params = {
new NameValuePair(AGProtocol.STRIP_WIDTH_PARAM_NAME, Float.toString(stripWidth)),
@@ -962,7 +955,7 @@ public String registerCartesianType(float stripWidth, float xmin, float xmax,
public String registerSphericalType(float stripWidth, String unit, float latmin, float longmin,
float latmax, float longmax) throws RepositoryException {
- String url = AGProtocol.getGeoTypesSphericalLocation(getSessionRoot());
+ String url = AGProtocol.getGeoTypesSphericalLocation(getRoot());
Header[] headers = {};
NameValuePair[] params = {
new NameValuePair(AGProtocol.STRIP_WIDTH_PARAM_NAME, Float.toString(stripWidth)),
@@ -989,7 +982,7 @@ public void registerPolygon(String polygon, List<String> points) throws Reposito
if (points.size()<3) {
throw new IllegalArgumentException("A minimum of three points are required to register a polygon.");
}
- String url = AGProtocol.getGeoPolygonLocation(getSessionRoot());
+ String url = AGProtocol.getGeoPolygonLocation(getRoot());
Header[] headers = {};
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
params.add(new NameValuePair(AGProtocol.RESOURCE_PARAM_NAME, polygon));
@@ -1010,7 +1003,7 @@ public void registerPolygon(String polygon, List<String> points) throws Reposito
public void getGeoBox(String type_uri, String predicate_uri, float xmin, float xmax,
float ymin, float ymax, int limit, boolean infer, AGResponseHandler handler)
throws RepositoryException {
- String url = AGProtocol.getGeoBoxLocation(getSessionRoot());
+ String url = AGProtocol.getGeoBoxLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredRDFFormat().getDefaultMIMEType()) };
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
@@ -1043,7 +1036,7 @@ public void getGeoBox(String type_uri, String predicate_uri, float xmin, float x
public void getGeoCircle(String type_uri, String predicate_uri, float x, float y,
float radius, int limit, boolean infer, AGResponseHandler handler)
throws RepositoryException {
- String url = AGProtocol.getGeoCircleLocation(getSessionRoot());
+ String url = AGProtocol.getGeoCircleLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredRDFFormat().getDefaultMIMEType()) };
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
@@ -1074,7 +1067,7 @@ public void getGeoCircle(String type_uri, String predicate_uri, float x, float y
public void getGeoHaversine(String type_uri, String predicate_uri, float lat, float lon, float radius, String unit, int limit, boolean infer, AGResponseHandler handler)
throws RepositoryException {
- String url = AGProtocol.getGeoHaversineLocation(getSessionRoot());
+ String url = AGProtocol.getGeoHaversineLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredRDFFormat().getDefaultMIMEType()) };
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
@@ -1105,7 +1098,7 @@ public void getGeoHaversine(String type_uri, String predicate_uri, float lat, fl
public void getGeoPolygon(String type_uri, String predicate_uri, String polygon, int limit, boolean infer, AGResponseHandler handler)
throws RepositoryException {
- String url = AGProtocol.getGeoPolygonLocation(getSessionRoot());
+ String url = AGProtocol.getGeoPolygonLocation(getRoot());
Header[] headers = { new Header(ACCEPT_PARAM_NAME,
getPreferredRDFFormat().getDefaultMIMEType()) };
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
@@ -1134,7 +1127,7 @@ public void getGeoPolygon(String type_uri, String predicate_uri, String polygon,
public void registerSNAGenerator(String generator, List<String> objectOfs, List<String> subjectOfs, List<String> undirecteds, String query) throws RepositoryException {
useDedicatedSession(autoCommit);
- String url = AGProtocol.getSNAGeneratorLocation(getSessionRoot(), generator);
+ String url = AGProtocol.getSNAGeneratorLocation(getRoot(), generator);
Header[] headers = {};
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
for (String objectOf: objectOfs) {
@@ -1161,7 +1154,7 @@ public void registerSNAGenerator(String generator, List<String> objectOfs, List<
}
public void registerSNANeighborMatrix(String matrix, String generator, List<String> group, int depth) throws RepositoryException {
- String url = AGProtocol.getSNANeighborMatrixLocation(getSessionRoot(), matrix);
+ String url = AGProtocol.getSNANeighborMatrixLocation(getRoot(), matrix);
Header[] headers = {};
List<NameValuePair> params = new ArrayList<NameValuePair>(7);
params.add(new NameValuePair(AGProtocol.GENERATOR_PARAM_NAME, generator));
View
24 src/com/franz/agraph/http/AGProtocol.java
@@ -175,11 +175,6 @@
public static final String MAPPING_PREDICATE = "predicate";
/**
- * Relative location of the federated service.
- */
- public static final String FEDERATED = "federated";
-
- /**
* Parameter name for the 'url' parameter for federation.
*/
public static final String URL_PARAM_NAME = "url";
@@ -402,13 +397,6 @@ public static final String getRootCatalogURL(String serverURL) {
}
/**
- * Location of the federated pseudo-catalog service
- */
- public static final String getFederatedCatalogURL(String serverURL) {
- return serverURL + "/" + FEDERATED;
- }
-
- /**
* Location of the named catalogs service
*/
public static final String getNamedCatalogsURL(String serverURL) {
@@ -477,14 +465,6 @@ public static String getPredicateMappingLocation(String sessionRoot) {
return getMappingLocation(sessionRoot) + "/" + MAPPING_PREDICATE;
}
- public static String getFederatedLocation(String serverRoot) {
- return serverRoot + "/" + FEDERATED;
- }
-
- public static String getFederationLocation(String serverRoot, String federationName) {
- return getFederatedLocation(serverRoot) + "/" + URLEncoder.encode(federationName);
- }
-
public static String getFunctorLocation(String serverRoot) {
return serverRoot + "/" + FUNCTOR;
}
@@ -497,10 +477,6 @@ public static String getRootCatalogRepositoriesLocation(String catalogURL) {
return catalogURL + "/" + REPOSITORIES;
}
- public static String getFederatedRepositoriesLocation(String catalogURL) {
- return catalogURL;
- }
-
public static String getEvalLocation(String sessionRoot) {
return sessionRoot + "/" + EVAL;
}
View
12 src/com/franz/agraph/repository/AGAbstractRepository.java
@@ -0,0 +1,12 @@
+package com.franz.agraph.repository;
+
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryException;
+import com.franz.util.Closeable;
+
+public interface AGAbstractRepository extends Repository, Closeable {
+ public String getSpec();
+ public AGValueFactory getValueFactory();
+ public AGRepositoryConnection getConnection() throws RepositoryException;
+ public AGCatalog getCatalog();
+}
View
15 src/com/franz/agraph/repository/AGCatalog.java
@@ -29,8 +29,7 @@
*
* A catalog is a grouping of repositories. The root catalog is the
* default, unnamed catalog that is available with every server. It
- * is also possible to create any number of named catalogs. There is
- * also a special catalog that groups federations in the server.
+ * is also possible to create any number of named catalogs.
*/
public class AGCatalog {
@@ -41,7 +40,6 @@
private final String repositoriesURL;
public static final int ROOT_CATALOG = 0;
- public static final int FEDERATED_CATALOG = 1;
public static final int NAMED_CATALOG = 2;
/**
@@ -61,7 +59,7 @@ public AGCatalog(AGServer server, String catalogName) {
/**
* Creates an AGCatalog instance for a special catalog in the given server,
- * either the root catalog or the federated catalog.
+ * such as the root catalog.
*
* @param server the server housing the catalog.
* @param catalogType the type of the special catalog.
@@ -73,11 +71,6 @@ public AGCatalog(AGServer server, int catalogType) {
catalogURL = AGProtocol.getRootCatalogURL(server.getServerURL());
repositoriesURL = AGProtocol.getRootCatalogRepositoriesLocation(catalogURL);
break;
- case FEDERATED_CATALOG:
- catalogName = "federated";
- catalogURL = AGProtocol.getFederatedCatalogURL(server.getServerURL());
- repositoriesURL = AGProtocol.getFederatedRepositoriesLocation(catalogURL);
- break;
default:
throw new IllegalArgumentException("Invalid Catalog Type: " + catalogType);
}
@@ -95,8 +88,7 @@ public AGServer getServer() {
}
/**
- * Returns the name of this catalog. The root and federated catalog
- * also have names, "/" and "federated", respectively.
+ * Returns the name of this catalog. The root catalog has the name "/".
*
* @return the name of this catalog.
*/
@@ -137,7 +129,6 @@ protected String getCatalogPrefixedRepositoryID(String repositoryID) {
case ROOT_CATALOG:
catalogPrefixedRepositoryID = repositoryID;
break;
- case FEDERATED_CATALOG:
case NAMED_CATALOG:
catalogPrefixedRepositoryID = getCatalogName() + ":" + repositoryID;
break;
View
21 src/com/franz/agraph/repository/AGRepository.java
@@ -21,13 +21,14 @@
import org.openrdf.repository.RepositoryException;
import com.franz.agraph.http.AGHTTPClient;
+import com.franz.agraph.http.AGHttpRepoClient;
import com.franz.util.Closeable;
/**
* Implements the Sesame Repository interface for AllegroGraph.
*
*/
-public class AGRepository implements Repository, Closeable {
+public class AGRepository implements AGAbstractRepository, Closeable {
private final AGCatalog catalog;
private final String repositoryID;
@@ -84,15 +85,6 @@ public String getRepositoryURL() {
return repositoryURL;
}
- /**
- * Returns true iff this repository is a federation.
- *
- * @return true iff this repository is a federation.
- */
- public boolean isFederation() {
- return (AGCatalog.FEDERATED_CATALOG == getCatalog().getCatalogType());
- }
-
public AGValueFactory getValueFactory() {
return vf;
}
@@ -105,7 +97,8 @@ public void initialize() throws RepositoryException {
}
public AGRepositoryConnection getConnection() throws RepositoryException {
- return new AGRepositoryConnection(this);
+ AGHttpRepoClient repoclient = new AGHttpRepoClient(this, getCatalog().getHTTPClient(), repositoryURL, null);
+ return new AGRepositoryConnection(this, repoclient);
}
/**
@@ -147,6 +140,12 @@ public boolean isWritable() throws RepositoryException {
return result;
}
+ public String getSpec() {
+ String cname = getCatalog().getCatalogName(), name = getRepositoryID();
+ if (cname == null) return "<" + name + ">";
+ else return "<" + cname + ":" + name + ">";
+ }
+
/**
* The dataDir is not currently applicable to AllegroGraph.
*/
View
15 src/com/franz/agraph/repository/AGRepositoryConnection.java
@@ -26,12 +26,14 @@
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.NamespaceImpl;
import org.openrdf.model.impl.StatementImpl;
import org.openrdf.query.BindingSet;
import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.QueryLanguage;
import org.openrdf.query.TupleQueryResult;
+import org.openrdf.repository.Repository;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;
import org.openrdf.repository.RepositoryResult;
@@ -53,18 +55,13 @@
public class AGRepositoryConnection extends RepositoryConnectionBase implements
RepositoryConnection, Closeable {
- private final AGRepository repository;
+ private final AGAbstractRepository repository;
private final AGHttpRepoClient repoclient;
- /**
- * @param repository
- * @throws RepositoryException
- */
- public AGRepositoryConnection(AGRepository repository)
- throws RepositoryException {
+ public AGRepositoryConnection(AGAbstractRepository repository, AGHttpRepoClient client) {
super(repository);
this.repository = repository;
- repoclient = new AGHttpRepoClient(this);
+ this.repoclient = client;
}
/*
@@ -73,7 +70,7 @@ public AGRepositoryConnection(AGRepository repository)
*/
@Override
- public AGRepository getRepository() {
+ public AGAbstractRepository getRepository() {
return repository;
}
View
71 src/com/franz/agraph/repository/AGServer.java
@@ -19,6 +19,7 @@
import org.openrdf.query.BindingSet;
import org.openrdf.query.TupleQueryResult;
import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.Repository;
import com.franz.agraph.http.AGErrorType;
import com.franz.agraph.http.AGHTTPClient;
@@ -35,8 +36,6 @@
private final String serverURL;
private final AGHTTPClient httpClient;
private final AGCatalog rootCatalog;
- private final AGCatalog federatedCatalog;
-
/**
* Creates an AGServer instance for interacting with an AllegroGraph
@@ -51,7 +50,6 @@ public AGServer(String serverURL, String username, String password) {
httpClient = new AGHTTPClient(serverURL);
httpClient.setUsernameAndPassword(username, password);
rootCatalog = new AGCatalog(this,AGCatalog.ROOT_CATALOG);
- federatedCatalog = new AGCatalog(this,AGCatalog.FEDERATED_CATALOG);
}
/**
@@ -77,15 +75,6 @@ public AGCatalog getRootCatalog() {
}
/**
- * Returns the catalog housing all federations for this AllegroGraph server.
- *
- * @return the federated catalog.
- */
- public AGCatalog getFederatedCatalog() {
- return federatedCatalog;
- }
-
- /**
* Returns a List of catalog ids known to this AllegroGraph server.
*
* @return List of catalog ids.
@@ -117,55 +106,17 @@ public AGCatalog getCatalog(String catalogID) {
return new AGCatalog(this, catalogID);
}
- /**
- * Creates a federation over the specified component repositories,
- * naming it by federationName.
- *
- * @param federationName the name of the federation.
- * @param repositories the component repositories.
- * @return the federation instance.
- * @throws RepositoryException
- */
- public AGRepository createFederation(String federationName, AGRepository... repositories)
- throws RepositoryException {
- String url = AGProtocol.getFederationLocation(getServerURL(), federationName);
- Header[] headers = {};
- List<NameValuePair> params = new ArrayList<NameValuePair>(3);
- for (AGRepository repo : repositories) {
- params.add(new NameValuePair(AGProtocol.REPO_PARAM_NAME, repo.getCatalogPrefixedRepositoryID()));
- // TODO: deal with remote repositories
- // params.add(new NameValuePair(AGProtocol.URL_PARAM_NAME, repo.getRepositoryURL()));
- }
- try {
- getHTTPClient().put(url,headers,params.toArray(new NameValuePair[params.size()]),null);
- } catch (IOException e) {
- throw new RepositoryException(e);
- } catch (AGHttpException e) {
- if (AGErrorType.PRECONDITION_FAILED == e.getErrorInfo()
- .getErrorType()) {
- // don't error if repo already exists
- // TODO: check if repo exists first
- } else {
- throw new RepositoryException(e);
- }
- }
- return new AGRepository(getFederatedCatalog(),federationName);
+ public AGVirtualRepository virtualRepository(String spec) {
+ return new AGVirtualRepository(this, spec, null);
}
-
- /**
- * Deletes a federation by name.
- *
- * @param federationName the name of the federation to delete.
- * @throws RepositoryException
- */
- public void deleteFederation(String federationName) throws RepositoryException {
- String url = AGProtocol.getFederationLocation(getServerURL(), federationName);
- Header[] headers = new Header[0];
- try {
- getHTTPClient().delete(url,headers,null);
- } catch (IOException e) {
- throw new RepositoryException(e);
- }
+
+ public AGVirtualRepository federate(AGAbstractRepository... repositories) {
+ String[] specstrings = new String[repositories.length];
+ for (int i = 0; i < repositories.length; i++)
+ specstrings[i] = repositories[i].getSpec();
+ String spec = AGVirtualRepository.federatedSpec(specstrings);
+
+ return new AGVirtualRepository(this, spec, null);
}
@Override
View
96 src/com/franz/agraph/repository/AGVirtualRepository.java
@@ -0,0 +1,96 @@
+package com.franz.agraph.repository;
+
+import java.io.File;
+import java.lang.RuntimeException;
+
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.BNode;
+
+import com.franz.agraph.http.AGHttpRepoClient;
+import com.franz.agraph.http.AGHTTPClient;
+import com.franz.agraph.repository.AGValueFactory;
+import com.franz.util.Closeable;
+
+public class AGVirtualRepository implements AGAbstractRepository, Closeable {
+ private AGServer server;
+ private AGRepository wrapped;
+ private String spec;
+ private AGValueFactory vf;
+
+ private class AGFederatedValueFactory extends AGValueFactory {
+ public AGFederatedValueFactory() {
+ super(null);
+ }
+
+ public BNode createBNode(String nodeID) {
+ throw new RuntimeException("Can not create a blank node for a federated store.");
+ }
+ }
+
+ public AGVirtualRepository(AGServer server, String spec, AGRepository wrapped) {
+ this.server = server;
+ this.spec = spec;
+ this.wrapped = wrapped;
+ if (wrapped == null)
+ vf = new AGFederatedValueFactory();
+ else
+ vf = new AGValueFactory(wrapped);
+ }
+
+ public AGServer getServer() {
+ return server;
+ }
+ public AGCatalog getCatalog() {
+ return null;
+ }
+ public String getSpec() {
+ return spec;
+ }
+
+ // interface
+ public boolean isWritable() {
+ return wrapped != null;
+ }
+ public AGValueFactory getValueFactory() {
+ return vf;
+ }
+ public AGRepositoryConnection getConnection() throws RepositoryException {
+ AGHTTPClient client = server.getHTTPClient();
+ AGHttpRepoClient repoclient = new AGHttpRepoClient(this, client, null, client.openSession(spec, true));
+ return new AGRepositoryConnection(this, repoclient);
+ }
+ public void close() throws RepositoryException {
+ shutDown();
+ }
+
+ // stubs
+ public void initialize() {}
+ public void shutDown() throws RepositoryException {}
+ public void setDataDir(File dataDir) {
+ throw new RuntimeException("setDataDir is inapplicable for AG repositories");
+ }
+ public File getDataDir() {
+ throw new RuntimeException("getDataDir is inapplicable for AG repositories");
+ }
+
+ // string-mangling utilities for creating sessions
+ public static String federatedSpec(String[] repos) {
+ String spec = "";
+ for (int i = 0; i < repos.length; i++) {
+ if (spec.length() > 0) spec += " + ";
+ spec += repos[i];
+ }
+ return spec;
+ }
+ public static String reasoningSpec(String repo, String reasoner) {
+ return repo + "[" + reasoner + "]";
+ }
+ public static String filteredSpec(String repo, String[] graphs) {
+ repo += "{";
+ for (String graph : graphs) repo += " " + graph;
+ return repo + "}";
+ }
+}
View
3 src/test/AGAbstractTest.java
@@ -36,6 +36,7 @@
import com.franz.agraph.repository.AGCatalog;
import com.franz.agraph.repository.AGRepository;
+import com.franz.agraph.repository.AGAbstractRepository;
import com.franz.agraph.repository.AGRepositoryConnection;
import com.franz.agraph.repository.AGServer;
import com.franz.agraph.repository.AGValueFactory;
@@ -171,7 +172,7 @@ AGRepositoryConnection getConnection() throws RepositoryException {
return conn;
}
- AGRepositoryConnection getConnection(AGRepository repo) throws RepositoryException {
+ AGRepositoryConnection getConnection(AGAbstractRepository repo) throws RepositoryException {
AGRepositoryConnection conn = repo.getConnection();
closeLater(conn);
return conn;
View
5 src/test/TutorialTests.java
@@ -55,6 +55,7 @@
import com.franz.agraph.repository.AGQueryLanguage;
import com.franz.agraph.repository.AGRepository;
+import com.franz.agraph.repository.AGAbstractRepository;
import com.franz.agraph.repository.AGRepositoryConnection;
import com.franz.agraph.repository.AGServer;
import com.franz.agraph.repository.AGValueFactory;
@@ -545,8 +546,8 @@ public void example16() throws Exception {
AGRepositoryConnection greenConn = getConnection(greenRepo);
greenConn.clear();
ValueFactory gf = greenConn.getValueFactory();
- AGServer server = repo.getCatalog().getServer();
- AGRepository rainbowRepo = server.createFederation("rainbowthingsjv", redRepo, greenRepo);
+ AGServer server = cat.getServer();
+ AGAbstractRepository rainbowRepo = server.federate(redRepo, greenRepo);
closeLater(rainbowRepo);
rainbowRepo.initialize();
assertFalse("Federation is writable?", rainbowRepo.isWritable());
View
5 src/tutorial/TutorialExamples.java
@@ -44,6 +44,7 @@
import com.franz.agraph.repository.AGCatalog;
import com.franz.agraph.repository.AGQueryLanguage;
import com.franz.agraph.repository.AGRepository;
+import com.franz.agraph.repository.AGAbstractRepository;
import com.franz.agraph.repository.AGRepositoryConnection;
import com.franz.agraph.repository.AGServer;
import com.franz.agraph.repository.AGValueFactory;
@@ -1455,7 +1456,7 @@ private static void pt(String kind, TupleQueryResult rows) throws Exception {
*/
public static void example16() throws Exception {
AGRepositoryConnection conn = example6();
- AGRepository myRepository = conn.getRepository();
+ AGAbstractRepository myRepository = conn.getRepository();
AGCatalog catalog = myRepository.getCatalog();
// create two ordinary stores, and one federated store:
AGRepository redRepo = catalog.createRepository("redthingsjv");
@@ -1471,7 +1472,7 @@ public static void example16() throws Exception {
greenConn.clear();
ValueFactory gf = greenConn.getValueFactory();
AGServer server = myRepository.getCatalog().getServer();
- AGRepository rainbowRepo = server.createFederation("rainbowthingsjv", redRepo, greenRepo);
+ AGAbstractRepository rainbowRepo = server.federate(redRepo, greenRepo);
rainbowRepo.initialize();
println("Federation is writable? " + rainbowRepo.isWritable());
AGRepositoryConnection rainbowConn = rainbowRepo.getConnection();

0 comments on commit 369fcf0

Please sign in to comment.
Something went wrong with that request. Please try again.