From 9f929949bab11d04e16c3033a73386affccdf430 Mon Sep 17 00:00:00 2001 From: kchilton2 Date: Tue, 8 May 2018 18:33:24 -0400 Subject: [PATCH 1/2] RYA-494 Fixed a bug where the shell was not loading or displaying all Statements. --- .../accumulo/AccumuloExecuteSparqlQuery.java | 2 +- .../AccumuloExecuteSparqlQueryIT.java | 144 ++++++++++++++++++ .../src/test/resources/test-statements.nt | 22 +++ .../org/apache/rya/shell/RyaCommands.java | 2 +- .../rya/shell/AccumuloRyaCommandsIT.java | 91 +++++++++++ .../src/test/resources/test-statements.nt | 22 +++ 6 files changed, 281 insertions(+), 2 deletions(-) create mode 100644 extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java create mode 100644 extras/indexing/src/test/resources/test-statements.nt create mode 100644 extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java create mode 100644 extras/shell/src/test/resources/test-statements.nt diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQuery.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQuery.java index b97ae8abc..e226260ed 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQuery.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQuery.java @@ -93,7 +93,7 @@ public TupleQueryResult executeSparqlQuery(final String ryaInstanceName, final S sailRepo = new SailRepository(sail); sailRepoConn = sailRepo.getConnection(); - // Execute the query. + // Execute the query. final TupleQuery tupleQuery = sailRepoConn.prepareTupleQuery(QueryLanguage.SPARQL, sparqlQuery); return tupleQuery.evaluate(); } catch (final SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java new file mode 100644 index 000000000..8c9d67a43 --- /dev/null +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.RdfCloudTripleStoreConstants; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests for the methods of {@link AccumuloExecuteSparqlQueryIT}. + */ +public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase { + + @Test + public void queryFindsAllLoadedStatements_fromSet() throws Exception { + // Using the Rya Client, install an instance of Rya for the test. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + + final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); + client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + + // Load some data into the instance. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set statements = Sets.newHashSet( + vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), + vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), + vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), + vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), + vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), + vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); + client.getLoadStatements().loadStatements(ryaInstance, statements); + + // Execute a query. + final Set fetched = new HashSet<>(); + try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { + while(result.hasNext()) { + final BindingSet bs = result.next(); + + // If this is the statement that indicates the Rya version. + if(RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE.equals(bs.getBinding("p").getValue())) { + continue; + } + + // Otherwise add it to the list of fetched statements. + fetched.add( vf.createStatement( + (Resource)bs.getBinding("s").getValue(), + (IRI)bs.getBinding("p").getValue(), + bs.getBinding("o").getValue()) ); + } + } + + // Show it resulted in the expected results. + assertEquals(statements, fetched); + } + + @Test + public void queryFindsAllLoadedStatements_fromFile() throws Exception { + // Using the Rya Client, install an instance of Rya for the test. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + + final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); + client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + + // Load some data into the instance from a file. + client.getLoadStatementsFile().loadStatements(ryaInstance, Paths.get("src/test/resources/test-statements.nt"), RDFFormat.NTRIPLES); + + // Execute a query. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set fetched = new HashSet<>(); + try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { + while(result.hasNext()) { + final BindingSet bs = result.next(); + + // If this is the statement that indicates the Rya version + if(RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE.equals(bs.getBinding("p").getValue())) { + continue; + } + + // Otherwise add it to the list of fetched statements. + fetched.add( vf.createStatement( + (Resource)bs.getBinding("s").getValue(), + (IRI)bs.getBinding("p").getValue(), + bs.getBinding("o").getValue()) ); + } + } + + // Show it resulted in the expected results. + final Set expected = Sets.newHashSet( + vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), + vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), + vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), + vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), + vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), + vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); + assertEquals(expected, fetched); + } +} \ No newline at end of file diff --git a/extras/indexing/src/test/resources/test-statements.nt b/extras/indexing/src/test/resources/test-statements.nt new file mode 100644 index 000000000..db0d51fae --- /dev/null +++ b/extras/indexing/src/test/resources/test-statements.nt @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + . + . + . + . + . + . \ No newline at end of file diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaCommands.java index c25786092..5004be303 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/RyaCommands.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaCommands.java @@ -185,7 +185,7 @@ public String sparqlQuery( if(rezIter.hasNext()) { consolePrinter.println("Query Results:"); final BindingSet bs = rezIter.next(); - for(final String name : rezIter.next().getBindingNames()) { + for(final String name : bs.getBindingNames()) { bindings.add(name); } consolePrinter.println(Strings.join(bindings, ",")); diff --git a/extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java b/extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java new file mode 100644 index 000000000..803612529 --- /dev/null +++ b/extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.shell.util.InstallPrompt; +import org.apache.rya.shell.util.PasswordPrompt; +import org.apache.rya.shell.util.SparqlPrompt; +import org.junit.Test; +import org.springframework.context.ApplicationContext; +import org.springframework.shell.Bootstrap; +import org.springframework.shell.core.CommandResult; +import org.springframework.shell.core.JLineShellComponent; + +import com.google.common.base.Optional; + +/** + * Integration tests for the methods of {@link RyaCommands}. + */ +public class AccumuloRyaCommandsIT extends RyaShellAccumuloITBase { + + @Test + public void loadsAndQueryData() throws Exception { + final MiniAccumuloCluster cluster = getCluster(); + final Bootstrap bootstrap = getTestBootstrap(); + final JLineShellComponent shell = getTestShell(); + + // Mock the user entering the correct password. + final ApplicationContext context = bootstrap.getApplicationContext(); + final PasswordPrompt mockPrompt = context.getBean( PasswordPrompt.class ); + when(mockPrompt.getPassword()).thenReturn("password".toCharArray()); + + // Connect to the mini accumulo instance. + String cmd = + RyaConnectionCommands.CONNECT_ACCUMULO_CMD + " " + + "--username root " + + "--instanceName " + cluster.getInstanceName() + " "+ + "--zookeepers " + cluster.getZooKeepers(); + CommandResult result = shell.executeCommand(cmd); + + // Install an instance of rya. + final String instanceName = "testInstance"; + final InstallConfiguration installConf = InstallConfiguration.builder().build(); + + final InstallPrompt installPrompt = context.getBean( InstallPrompt.class ); + when(installPrompt.promptInstanceName()).thenReturn("testInstance"); + when(installPrompt.promptInstallConfiguration("testInstance")).thenReturn( installConf ); + when(installPrompt.promptVerified(instanceName, installConf)).thenReturn(true); + + result = shell.executeCommand( RyaAdminCommands.INSTALL_CMD ); + assertTrue( result.isSuccess() ); + + // Connect to the instance that was just installed. + cmd = RyaConnectionCommands.CONNECT_INSTANCE_CMD + " --instance " + instanceName; + result = shell.executeCommand(cmd); + assertTrue( result.isSuccess() ); + + // Load a statements file into the instance. + cmd = RyaCommands.LOAD_DATA_CMD + " --file src/test/resources/test-statements.nt"; + result = shell.executeCommand(cmd); + assertTrue( result.isSuccess() ); + + // Query for all of the statements that were loaded. + final SparqlPrompt sparqlPrompt = context.getBean(SparqlPrompt.class); + when(sparqlPrompt.getSparql()).thenReturn(Optional.of("select * where { ?s ?p ?o .}")); + + cmd = RyaCommands.SPARQL_QUERY_CMD; + result = shell.executeCommand(cmd); + assertTrue( result.isSuccess() ); + } +} \ No newline at end of file diff --git a/extras/shell/src/test/resources/test-statements.nt b/extras/shell/src/test/resources/test-statements.nt new file mode 100644 index 000000000..db0d51fae --- /dev/null +++ b/extras/shell/src/test/resources/test-statements.nt @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + . + . + . + . + . + . \ No newline at end of file From a21518a81e9de93067cec18948611e9411cdd292 Mon Sep 17 00:00:00 2001 From: kchilton2 Date: Tue, 17 Apr 2018 15:10:26 -0400 Subject: [PATCH 2/2] RYA-487 Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya. --- .../accumulo/AccumuloRdfConfiguration.java | 96 ++-- .../rya/mongodb/MongoDBRdfConfiguration.java | 8 +- .../rya/indexing/accumulo/ConfigUtils.java | 3 + extras/kafka.connect/README.md | 22 + extras/kafka.connect/accumulo-it/README.md | 19 + extras/kafka.connect/accumulo-it/pom.xml | 62 +++ .../accumulo/AccumuloRyaSinkTaskIT.java | 100 ++++ extras/kafka.connect/accumulo/README.md | 23 + extras/kafka.connect/accumulo/pom.xml | 79 +++ .../accumulo/AccumuloRyaSinkConfig.java | 97 ++++ .../accumulo/AccumuloRyaSinkConnector.java | 66 +++ .../connect/accumulo/AccumuloRyaSinkTask.java | 112 ++++ .../accumulo/AccumuloRyaSinkConfigTest.java | 42 ++ extras/kafka.connect/api/README.md | 20 + extras/kafka.connect/api/pom.xml | 96 ++++ .../connect/api/StatementsConverter.java | 62 +++ .../connect/api/StatementsDeserializer.java | 87 ++++ .../kafka/connect/api/StatementsSerde.java | 57 ++ .../connect/api/StatementsSerializer.java | 77 +++ .../kafka/connect/api/sink/RyaSinkConfig.java | 67 +++ .../connect/api/sink/RyaSinkConnector.java | 69 +++ .../kafka/connect/api/sink/RyaSinkTask.java | 145 ++++++ .../connect/api/StatementsSerdeTest.java | 84 +++ .../connect/api/sink/RyaSinkTaskTest.java | 264 ++++++++++ .../test/resources/simplelogger.properties | 17 + extras/kafka.connect/client/README.md | 21 + extras/kafka.connect/client/pom.xml | 113 ++++ .../rya/kafka/connect/client/CLIDriver.java | 121 +++++ .../connect/client/RyaKafkaClientCommand.java | 115 ++++ .../client/command/ReadStatementsCommand.java | 120 +++++ .../command/WriteStatementsCommand.java | 187 +++++++ .../src/main/resources/log4j.properties | 27 + extras/kafka.connect/mongo-it/README.md | 19 + extras/kafka.connect/mongo-it/pom.xml | 62 +++ .../connect/mongo/MongoRyaSinkTaskIT.java | 95 ++++ extras/kafka.connect/mongo/README.md | 23 + extras/kafka.connect/mongo/pom.xml | 79 +++ .../connect/mongo/MongoRyaSinkConfig.java | 94 ++++ .../connect/mongo/MongoRyaSinkConnector.java | 63 +++ .../kafka/connect/mongo/MongoRyaSinkTask.java | 123 +++++ .../connect/mongo/MongoRyaSinkConfigTest.java | 42 ++ extras/kafka.connect/pom.xml | 66 +++ extras/pom.xml | 1 + extras/rya.manual/src/site/markdown/_index.md | 1 + extras/rya.manual/src/site/markdown/index.md | 1 + .../markdown/kafka-connect-integration.md | 493 ++++++++++++++++++ extras/rya.manual/src/site/site.xml | 3 +- pom.xml | 52 +- 48 files changed, 3637 insertions(+), 58 deletions(-) create mode 100644 extras/kafka.connect/README.md create mode 100644 extras/kafka.connect/accumulo-it/README.md create mode 100644 extras/kafka.connect/accumulo-it/pom.xml create mode 100644 extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java create mode 100644 extras/kafka.connect/accumulo/README.md create mode 100644 extras/kafka.connect/accumulo/pom.xml create mode 100644 extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java create mode 100644 extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java create mode 100644 extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java create mode 100644 extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java create mode 100644 extras/kafka.connect/api/README.md create mode 100644 extras/kafka.connect/api/pom.xml create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java create mode 100644 extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java create mode 100644 extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java create mode 100644 extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java create mode 100644 extras/kafka.connect/api/src/test/resources/simplelogger.properties create mode 100644 extras/kafka.connect/client/README.md create mode 100644 extras/kafka.connect/client/pom.xml create mode 100644 extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java create mode 100644 extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java create mode 100644 extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java create mode 100644 extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java create mode 100644 extras/kafka.connect/client/src/main/resources/log4j.properties create mode 100644 extras/kafka.connect/mongo-it/README.md create mode 100644 extras/kafka.connect/mongo-it/pom.xml create mode 100644 extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java create mode 100644 extras/kafka.connect/mongo/README.md create mode 100644 extras/kafka.connect/mongo/pom.xml create mode 100644 extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java create mode 100644 extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java create mode 100644 extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java create mode 100644 extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java create mode 100644 extras/kafka.connect/pom.xml create mode 100644 extras/rya.manual/src/site/markdown/kafka-connect-integration.md diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java index ed76b4a32..cbfe2ea1a 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } - public AccumuloRdfConfiguration(Configuration other) { + public AccumuloRdfConfiguration(final Configuration other) { super(other); } - public AccumuloRdfConfigurationBuilder getBuilder() { + public static AccumuloRdfConfigurationBuilder getBuilder() { return new AccumuloRdfConfigurationBuilder(); } - + /** * Creates an AccumuloRdfConfiguration object from a Properties file. This method assumes * that all values in the Properties file are Strings and that the Properties file uses the keys below. @@ -94,26 +94,26 @@ public AccumuloRdfConfigurationBuilder getBuilder() { * @param props - Properties file containing Accumulo specific configuration parameters * @return AccumumuloRdfConfiguration with properties set */ - - public static AccumuloRdfConfiguration fromProperties(Properties props) { + + public static AccumuloRdfConfiguration fromProperties(final Properties props) { return AccumuloRdfConfigurationBuilder.fromProperties(props).build(); } - + @Override public AccumuloRdfConfiguration clone() { return new AccumuloRdfConfiguration(this); } - + /** * Sets the Accumulo username from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. * */ - public void setAccumuloUser(String user) { + public void setAccumuloUser(final String user) { Preconditions.checkNotNull(user); set(CLOUDBASE_USER, user); } - + /** * Get the Accumulo username from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. @@ -121,19 +121,19 @@ public void setAccumuloUser(String user) { * @return The username if one could be found; otherwise {@code null}. */ public String getAccumuloUser(){ - return get(CLOUDBASE_USER); + return get(CLOUDBASE_USER); } - + /** * Sets the Accumulo password from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. * */ - public void setAccumuloPassword(String password) { + public void setAccumuloPassword(final String password) { Preconditions.checkNotNull(password); set(CLOUDBASE_PASSWORD, password); } - + /** * Get the Accumulo password from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. @@ -143,18 +143,18 @@ public void setAccumuloPassword(String password) { public String getAccumuloPassword() { return get(CLOUDBASE_PASSWORD); } - + /** * Sets a comma delimited list of the names of the Zookeeper servers from * the configuration object that is meant to be used when connecting a * {@link Connector} to Accumulo. * */ - public void setAccumuloZookeepers(String zookeepers) { + public void setAccumuloZookeepers(final String zookeepers) { Preconditions.checkNotNull(zookeepers); set(CLOUDBASE_ZOOKEEPERS, zookeepers); } - + /** * Get a comma delimited list of the names of the Zookeeper servers from * the configuration object that is meant to be used when connecting a @@ -165,17 +165,17 @@ public void setAccumuloZookeepers(String zookeepers) { public String getAccumuloZookeepers() { return get(CLOUDBASE_ZOOKEEPERS); } - + /** * Sets the Accumulo instance name from the configuration object that is * meant to be used when connecting a {@link Connector} to Accumulo. * */ - public void setAccumuloInstance(String instance) { + public void setAccumuloInstance(final String instance) { Preconditions.checkNotNull(instance); set(CLOUDBASE_INSTANCE, instance); } - + /** * Get the Accumulo instance name from the configuration object that is * meant to be used when connecting a {@link Connector} to Accumulo. @@ -185,15 +185,15 @@ public void setAccumuloInstance(String instance) { public String getAccumuloInstance() { return get(CLOUDBASE_INSTANCE); } - + /** * Tells the Rya instance to use a Mock instance of Accumulo as its backing. * */ - public void setUseMockAccumulo(boolean useMock) { + public void setUseMockAccumulo(final boolean useMock) { setBoolean(USE_MOCK_INSTANCE, useMock); } - + /** * Indicates that a Mock instance of Accumulo is being used to back the Rya instance. * @@ -202,12 +202,12 @@ public void setUseMockAccumulo(boolean useMock) { public boolean getUseMockAccumulo() { return getBoolean(USE_MOCK_INSTANCE, false); } - + /** * @param enabled - {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}. */ - public void useMockInstance(boolean enabled) { + public void useMockInstance(final boolean enabled) { super.setBooleanIfUnset(USE_MOCK_INSTANCE, enabled); } @@ -224,7 +224,7 @@ public boolean useMockInstance() { * @param username - The Accumulo username from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. */ - public void setUsername(String username) { + public void setUsername(final String username) { super.set(CLOUDBASE_USER, username); } @@ -242,7 +242,7 @@ public String getUsername() { * @param password - The Accumulo password from the configuration object that is meant to * be used when connecting a {@link Connector} to Accumulo. */ - public void setPassword(String password) { + public void setPassword(final String password) { super.set(CLOUDBASE_PASSWORD, password); } @@ -260,7 +260,7 @@ public String getPassword() { * @param instanceName - The Accumulo instance name from the configuration object that is * meant to be used when connecting a {@link Connector} to Accumulo. */ - public void setInstanceName(String instanceName) { + public void setInstanceName(final String instanceName) { super.set(CLOUDBASE_INSTANCE, instanceName); } @@ -279,7 +279,7 @@ public String getInstanceName() { * the configuration object that is meant to be used when connecting a * {@link Connector} to Accumulo. */ - public void setZookeepers(String zookeepers) { + public void setZookeepers(final String zookeepers) { super.set(CLOUDBASE_ZOOKEEPERS, zookeepers); } @@ -295,14 +295,14 @@ public String getZookeepers() { } public Authorizations getAuthorizations() { - String[] auths = getAuths(); + final String[] auths = getAuths(); if (auths == null || auths.length == 0) { return AccumuloRdfConstants.ALL_AUTHORIZATIONS; } return new Authorizations(auths); } - public void setMaxRangesForScanner(Integer max) { + public void setMaxRangesForScanner(final Integer max) { setInt(MAXRANGES_SCANNER, max); } @@ -310,9 +310,9 @@ public Integer getMaxRangesForScanner() { return getInt(MAXRANGES_SCANNER, 2); } - public void setAdditionalIndexers(Class... indexers) { - List strs = Lists.newArrayList(); - for (Class ai : indexers){ + public void setAdditionalIndexers(final Class... indexers) { + final List strs = Lists.newArrayList(); + for (final Class ai : indexers){ strs.add(ai.getName()); } @@ -326,25 +326,25 @@ public boolean flushEachUpdate(){ return getBoolean(CONF_FLUSH_EACH_UPDATE, true); } - public void setFlush(boolean flush){ + public void setFlush(final boolean flush){ setBoolean(CONF_FLUSH_EACH_UPDATE, flush); } - public void setAdditionalIterators(IteratorSetting... additionalIterators){ + public void setAdditionalIterators(final IteratorSetting... additionalIterators){ //TODO do we need to worry about cleaning up this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length)); int i = 0; - for(IteratorSetting iterator : additionalIterators) { + for(final IteratorSetting iterator : additionalIterators) { this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName()); this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass()); this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority())); - Map options = iterator.getOptions(); + final Map options = iterator.getOptions(); this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size())); - Iterator> it = options.entrySet().iterator(); + final Iterator> it = options.entrySet().iterator(); int j = 0; while(it.hasNext()) { - Entry item = it.next(); + final Entry item = it.next(); this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey()); this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue()); j++; @@ -354,22 +354,22 @@ public void setAdditionalIterators(IteratorSetting... additionalIterators){ } public IteratorSetting[] getAdditionalIterators(){ - int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); + final int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); if(size == 0) { return new IteratorSetting[0]; } - IteratorSetting[] settings = new IteratorSetting[size]; + final IteratorSetting[] settings = new IteratorSetting[size]; for(int i = 0; i < size; i++) { - String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); - String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); - int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); + final String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); + final String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); + final int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); - int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); - Map options = new HashMap<>(optionsSize); + final int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); + final Map options = new HashMap<>(optionsSize); for(int j = 0; j < optionsSize; j++) { - String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); - String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); + final String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); + final String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); options.put(key, value); } settings[i] = new IteratorSetting(priority, name, iteratorClass, options); diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java index d49f2eea2..b207d7944 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java @@ -274,17 +274,17 @@ public boolean getUseAggregationPipeline() { * on their child subtrees. * @param value whether to use aggregation pipeline optimization. */ - public void setUseAggregationPipeline(boolean value) { + public void setUseAggregationPipeline(final boolean value) { setBoolean(USE_AGGREGATION_PIPELINE, value); } @Override public List> getOptimizers() { - List> optimizers = super.getOptimizers(); + final List> optimizers = super.getOptimizers(); if (getUseAggregationPipeline()) { - Class cl = AggregationPipelineQueryOptimizer.class; + final Class cl = AggregationPipelineQueryOptimizer.class; @SuppressWarnings("unchecked") - Class optCl = (Class) cl; + final Class optCl = (Class) cl; optimizers.add(optCl); } return optimizers; diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java index d2fe58a31..77c77cd0d 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java @@ -438,6 +438,9 @@ public Optional getFluoAppName(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } + public static void setUseMongo(final Configuration conf, final boolean useMongo) { + conf.setBoolean(USE_MONGO, useMongo); + } public static boolean getUseMongo(final Configuration conf) { return conf.getBoolean(USE_MONGO, false); diff --git a/extras/kafka.connect/README.md b/extras/kafka.connect/README.md new file mode 100644 index 000000000..03b63c2c7 --- /dev/null +++ b/extras/kafka.connect/README.md @@ -0,0 +1,22 @@ + + +The parent project for all Rya Kafka Connect work. All projects that are part +of that system must use this project's pom as their parent pom. + +For more information about the Rya's Kafka Connect support, see +[the manual](../rya.manual/src/site/markdown/kafka-connect-integration.md). \ No newline at end of file diff --git a/extras/kafka.connect/accumulo-it/README.md b/extras/kafka.connect/accumulo-it/README.md new file mode 100644 index 000000000..abcc12d38 --- /dev/null +++ b/extras/kafka.connect/accumulo-it/README.md @@ -0,0 +1,19 @@ + + +This project contains integration tests that verify an Accumulo backed +implementation of the Rya Kafka Connect Sink is working properly. \ No newline at end of file diff --git a/extras/kafka.connect/accumulo-it/pom.xml b/extras/kafka.connect/accumulo-it/pom.xml new file mode 100644 index 000000000..af088a938 --- /dev/null +++ b/extras/kafka.connect/accumulo-it/pom.xml @@ -0,0 +1,62 @@ + + + + 4.0.0 + + + org.apache.rya + rya.kafka.connect.parent + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.accumulo.it + + Apache Rya Kafka Connect - Accumulo Integration Tests + Tests the Kafka Connect Sink that writes to a Rya instance backed by Accumulo. + + + + + org.apache.rya + rya.kafka.connect.accumulo + + + + + org.apache.kafka + connect-api + provided + + + + + junit + junit + test + + + org.apache.rya + rya.test.accumulo + test + + + \ No newline at end of file diff --git a/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java new file mode 100644 index 000000000..1775a74d8 --- /dev/null +++ b/extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.junit.Test; + +/** + * Integration tests for the methods of {@link AccumuloRyaSinkTask}. + */ +public class AccumuloRyaSinkTaskIT extends AccumuloITBase { + + @Test + public void instanceExists() throws Exception { + // Install an instance of Rya. + final String ryaInstanceName = getRyaInstanceName(); + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(false) + .setEnableGeoIndex(false) + .build(); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + ryaClient.getInstall().install(ryaInstanceName, installConfig); + + // Create the task that will be tested. + final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask(); + + try { + // Configure the task to use the embedded accumulo instance for Rya. + final Map config = new HashMap<>(); + config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers()); + config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName()); + config.put(AccumuloRyaSinkConfig.USERNAME, getUsername()); + config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword()); + config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, ryaInstanceName); + + // This will pass because the Rya instance exists. + task.start(config); + + } finally { + task.stop(); + } + } + + @Test(expected = ConnectException.class) + public void instanceDoesNotExist() throws Exception { + // Create the task that will be tested. + final AccumuloRyaSinkTask task = new AccumuloRyaSinkTask(); + + try { + // Configure the task to use the embedded accumulo instance for Rya. + final Map config = new HashMap<>(); + config.put(AccumuloRyaSinkConfig.ZOOKEEPERS, getZookeepers()); + config.put(AccumuloRyaSinkConfig.CLUSTER_NAME, getInstanceName()); + config.put(AccumuloRyaSinkConfig.USERNAME, getUsername()); + config.put(AccumuloRyaSinkConfig.PASSWORD, getPassword()); + config.put(AccumuloRyaSinkConfig.RYA_INSTANCE_NAME, getRyaInstanceName()); + + // Staring the task will fail because the Rya instance does not exist. + task.start(config); + + } finally { + task.stop(); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/accumulo/README.md b/extras/kafka.connect/accumulo/README.md new file mode 100644 index 000000000..eecfd2111 --- /dev/null +++ b/extras/kafka.connect/accumulo/README.md @@ -0,0 +1,23 @@ + + +This project is the Rya Kafka Connect Sink that writes to Accumulo backed +instances of Rya. + +This project produces a shaded jar that may be installed into Kafka Connect. +For more information about how to install and configure this connector, see +[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md). \ No newline at end of file diff --git a/extras/kafka.connect/accumulo/pom.xml b/extras/kafka.connect/accumulo/pom.xml new file mode 100644 index 000000000..54188db02 --- /dev/null +++ b/extras/kafka.connect/accumulo/pom.xml @@ -0,0 +1,79 @@ + + + + 4.0.0 + + + org.apache.rya + rya.kafka.connect.parent + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.accumulo + + Apache Rya Kafka Connect - Accumulo + A Kafka Connect Sink that writes to a Rya instance backed by Accumulo. + + + + + org.apache.rya + rya.kafka.connect.api + + + org.apache.rya + rya.indexing + + + + + org.apache.kafka + connect-api + provided + + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + + + \ No newline at end of file diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java new file mode 100644 index 000000000..8db4f1ce2 --- /dev/null +++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfig.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link AccumuloRyaSinkConnector}s + * and {@link AccumuloRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConfig extends RyaSinkConfig { + + public static final String ZOOKEEPERS = "accumulo.zookeepers"; + private static final String ZOOKEEPERS_DOC = "A comma delimited list of the Zookeeper server hostname/port pairs."; + + public static final String CLUSTER_NAME = "accumulo.cluster.name"; + private static final String CLUSTER_NAME_DOC = "The name of the Accumulo instance within Zookeeper."; + + public static final String USERNAME = "accumulo.username"; + private static final String USERNAME_DOC = "The Accumulo username the Sail connections will use."; + + public static final String PASSWORD = "accumulo.password"; + private static final String PASSWORD_DOC = "The Accumulo password the Sail connections will use."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ZOOKEEPERS, Type.STRING, Importance.HIGH, ZOOKEEPERS_DOC) + .define(CLUSTER_NAME, Type.STRING, Importance.HIGH, CLUSTER_NAME_DOC) + .define(USERNAME, Type.STRING, Importance.HIGH, USERNAME_DOC) + .define(PASSWORD, Type.PASSWORD, Importance.HIGH, PASSWORD_DOC); + static { + RyaSinkConfig.addCommonDefinitions(CONFIG_DEF); + } + + /** + * Constructs an instance of {@link AccumuloRyaSinkConfig}. + * + * @param originals - The key/value pairs that define the configuration. (not null) + */ + public AccumuloRyaSinkConfig(final Map originals) { + super(CONFIG_DEF, requireNonNull(originals)); + } + + /** + * @return A comma delimited list of the Zookeeper server hostname/port pairs. + */ + public String getZookeepers() { + return super.getString(ZOOKEEPERS); + } + + /** + * @return The name of the Accumulo instance within Zookeeper. + */ + public String getClusterName() { + return super.getString(CLUSTER_NAME); + } + + /** + * @return The Accumulo username the Sail connections will use. + */ + public String getUsername() { + return super.getString(USERNAME); + } + + /** + * @return The Accumulo password the Sail connections will use. + */ + public String getPassword() { + return super.getPassword(PASSWORD).value(); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java new file mode 100644 index 000000000..eeb3d751f --- /dev/null +++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConnector extends RyaSinkConnector { + + @Nullable + private AccumuloRyaSinkConfig config = null; + + @Override + public void start(final Map props) { + requireNonNull(props); + this.config = new AccumuloRyaSinkConfig( props ); + } + + @Override + protected AbstractConfig getConfig() { + if(config == null) { + throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first."); + } + return config; + } + + @Override + public Class taskClass() { + return AccumuloRyaSinkTask.class; + } + + @Override + public ConfigDef config() { + return AccumuloRyaSinkConfig.CONFIG_DEF; + } +} \ No newline at end of file diff --git a/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java new file mode 100644 index 000000000..7d19f2959 --- /dev/null +++ b/extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.log.LogUtils; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + + @Override + protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { + requireNonNull(taskConfig); + + // Parse the configuration object. + final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + + // Connect to the instance of Accumulo. + final Connector connector; + try { + final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); + connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); + } catch (final AccumuloException | AccumuloSecurityException e) { + throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); + } + + // Use a RyaClient to see if the configured instance exists. + try { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + config.getUsername(), + config.getPassword().toCharArray(), + config.getClusterName(), + config.getZookeepers()); + final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + + if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { + throw new ConnectException("The Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has not been installed."); + } + + } catch (final RyaClientException e) { + throw new ConnectException("Unable to determine if the Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e); + } + } + + @Override + protected Sail makeSail(final Map taskConfig) throws ConnectException { + requireNonNull(taskConfig); + + // Parse the configuration object. + final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + + // Move the configuration into a Rya Configuration object. + final AccumuloRdfConfiguration ryaConfig = new AccumuloRdfConfiguration(); + ryaConfig.setTablePrefix( config.getRyaInstanceName() ); + ryaConfig.setAccumuloZookeepers( config.getZookeepers() ); + ryaConfig.setAccumuloInstance( config.getClusterName() ); + ryaConfig.setAccumuloUser( config.getUsername() ); + ryaConfig.setAccumuloPassword( config.getPassword() ); + + // Create the Sail object. + try { + return RyaSailFactory.getInstance(ryaConfig); + } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java new file mode 100644 index 000000000..66ecd878d --- /dev/null +++ b/extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link AccumuloRyaSinkConfig}. + */ +public class AccumuloRyaSinkConfigTest { + + @Test + public void parses() { + final Map properties = new HashMap<>(); + properties.put(AccumuloRyaSinkConfig.ZOOKEEPERS, "zoo1:2181,zoo2"); + properties.put(AccumuloRyaSinkConfig.CLUSTER_NAME, "test"); + properties.put(AccumuloRyaSinkConfig.USERNAME, "alice"); + properties.put(AccumuloRyaSinkConfig.PASSWORD, "alice1234!@"); + properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya_"); + new AccumuloRyaSinkConfig(properties); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/README.md b/extras/kafka.connect/api/README.md new file mode 100644 index 000000000..777fd2a19 --- /dev/null +++ b/extras/kafka.connect/api/README.md @@ -0,0 +1,20 @@ + + +This project contains the common components of a Rya Kafka Connect Sink. Each +backend database that Rya is built on top of must have an implementation using +this project's components. \ No newline at end of file diff --git a/extras/kafka.connect/api/pom.xml b/extras/kafka.connect/api/pom.xml new file mode 100644 index 000000000..3727394c5 --- /dev/null +++ b/extras/kafka.connect/api/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + + org.apache.rya + rya.kafka.connect.parent + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.api + + Apache Rya Kafka Connect - API + Contains common components used when implementing a Kafka Connect Sink + that writes to a Rya instance. + + + + + org.apache.rya + rya.api.model + + + + + org.eclipse.rdf4j + rdf4j-rio-api + + + org.eclipse.rdf4j + rdf4j-rio-binary + + + org.eclipse.rdf4j + rdf4j-rio-datatypes + + + com.github.stephenc.findbugs + findbugs-annotations + + + com.jcabi + jcabi-manifests + + + org.apache.kafka + connect-api + provided + + + org.eclipse.rdf4j + rdf4j-runtime + + + org.slf4j + slf4j-api + + + + + junit + junit + test + + + org.mockito + mockito-all + test + + + org.slf4j + slf4j-simple + test + + + \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java new file mode 100644 index 000000000..eb4b61125 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsConverter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.eclipse.rdf4j.model.Statement; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A plugin into the Kafka Connect platform that converts {@link Set}s of {@link Statement}s + * to/from byte[]s by using a {@link StatementsSerializer} and a {@link StatementsDeserializer}. + *

+ * This converter does not use Kafka's Schema Registry. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsConverter implements Converter { + + private static final StatementsSerializer SERIALIZER = new StatementsSerializer(); + private static final StatementsDeserializer DESERIALIZER = new StatementsDeserializer(); + + @Override + public void configure(final Map configs, final boolean isKey) { + // This converter's behavior can not be tuned with configurations. + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + requireNonNull(value); + return SERIALIZER.serialize(topic, (Set) value); + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + requireNonNull(value); + return new SchemaAndValue(null, DESERIALIZER.deserialize(topic, value)); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java new file mode 100644 index 000000000..fb03347a1 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized + * set of {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsDeserializer implements Deserializer> { + private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class); + + private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory(); + + @Override + public void configure(final Map configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public Set deserialize(final String topic, final byte[] data) { + if(data == null || data.length == 0) { + // Return null because that is the contract of this method. + return null; + } + + try { + final RDFParser parser = PARSER_FACTORY.getParser(); + final Set statements = new HashSet<>(); + + parser.setRDFHandler(new AbstractRDFHandler() { + @Override + public void handleStatement(final Statement statement) throws RDFHandlerException { + log.debug("Statement: " + statement); + statements.add( statement ); + } + }); + + parser.parse(new ByteArrayInputStream(data), null); + return statements; + + } catch(final RDFParseException | RDFHandlerException | IOException e) { + log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binary format.", e); + return null; + } + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java new file mode 100644 index 000000000..f2101d63f --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerde.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides a {@link Serializer} and {@link Deserializer} for {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsSerde implements Serde> { + + @Override + public void configure(final Map configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public Serializer> serializer() { + return new StatementsSerializer(); + } + + @Override + public Deserializer> deserializer() { + return new StatementsDeserializer(); + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java new file mode 100644 index 000000000..893df0cd2 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsSerializer implements Serializer> { + private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class); + + private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory(); + + @Override + public void configure(final Map configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public byte[] serialize(final String topic, final Set data) { + if(data == null) { + // Returning null because that is the contract of this method. + return null; + } + + // Write the statements using a Binary RDF Writer. + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final RDFWriter writer = WRITER_FACTORY.getWriter(baos); + writer.startRDF(); + + for(final Statement stmt : data) { + // Write the statement. + log.debug("Writing Statement: " + stmt); + writer.handleStatement(stmt); + } + writer.endRDF(); + + // Return the byte[] version of the data. + return baos.toByteArray(); + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java new file mode 100644 index 000000000..5c3e2ccca --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConfig.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Contains common configuration fields for a Rya Sinks. + */ +@DefaultAnnotation(NonNull.class) +public class RyaSinkConfig extends AbstractConfig { + + public static final String RYA_INSTANCE_NAME = "rya.instance.name"; + private static final String RYA_INSTANCE_NAME_DOC = "The name of the RYA instance that will be connected to."; + + /** + * @param configDef - The configuration schema definition that will be updated to include + * this configuration's fields. (not null) + */ + public static void addCommonDefinitions(final ConfigDef configDef) { + requireNonNull(configDef); + configDef.define(RYA_INSTANCE_NAME, Type.STRING, Importance.HIGH, RYA_INSTANCE_NAME_DOC); + } + + /** + * Constructs an instance of {@link RyaSinkConfig}. + * + * @param definition - Defines the schema of the configuration. (not null) + * @param originals - The key/value pairs that define the configuration. (not null) + */ + public RyaSinkConfig(final ConfigDef definition, final Map originals) { + super(definition, originals); + } + + /** + * @return The name of the RYA instance that will be connected to. + */ + public String getRyaInstanceName() { + return super.getString(RYA_INSTANCE_NAME); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java new file mode 100644 index 000000000..f288af238 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.sink.SinkConnector; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya. + *

+ * Implementations of this class only need to specify functionality that is specific to the Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkConnector extends SinkConnector { + + /** + * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked. + *

+ * Only called after start has been invoked + * + * @return The configuration object for the connector. + * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet. + */ + protected abstract AbstractConfig getConfig() throws IllegalStateException; + + @Override + public String version() { + return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN"; + } + + @Override + public List> taskConfigs(final int maxTasks) { + final List> configs = new ArrayList<>(maxTasks); + for(int i = 0; i < maxTasks; i++) { + configs.add( getConfig().originalsStrings() ); + } + return configs; + } + + @Override + public void stop() { + // Nothing to do since the RyaSinkConnector has no background monitoring. + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java new file mode 100644 index 000000000..5ff118ae1 --- /dev/null +++ b/extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + *

+ * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + + @Nullable + private SailRepository sailRepo = null; + + @Nullable + private SailRepositoryConnection conn = null; + + /** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ + protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException; + + /** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ + protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + + @Override + public String version() { + return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; + } + + @Override + public void start(final Map props) throws ConnectException { + requireNonNull(props); + + // Ensure the configured Rya Instance is installed within the configured database. + checkRyaInstanceExists(props); + + // Create the Sail object that is connected to the Rya Instance. + final Sail sail = makeSail(props); + sailRepo = new SailRepository( sail ); + conn = sailRepo.getConnection(); + } + + @Override + public void put(final Collection records) { + requireNonNull(records); + + // Return immediately if there are no records to handle. + if(records.isEmpty()) { + return; + } + + // If a transaction has not been started yet, then start one. + if(!conn.isActive()) { + conn.begin(); + } + + // Iterate through the records and write them to the Sail object. + for(final SinkRecord record : records) { + // If everything has been configured correctly, then the record's value will be a Set. + conn.add((Set) record.value()); + } + } + + @Override + public void flush(final Map currentOffsets) { + requireNonNull(currentOffsets); + // Flush the current transaction. + conn.commit(); + } + + @Override + public void stop() { + try { + if(conn != null) { + conn.close(); + } + } catch(final Exception e) { + log.error("Could not close the Sail Repository Connection.", e); + } + + try { + if(sailRepo != null) { + sailRepo.shutDown(); + } + } catch(final Exception e) { + log.error("Could not shut down the Sail Repository.", e); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java new file mode 100644 index 000000000..01e5b7603 --- /dev/null +++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/StatementsSerdeTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Set; + +import org.apache.kafka.common.serialization.Serde; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link StatementsSerde}. + */ +public class StatementsSerdeTest { + + @Test + public void serializeAndDeserialize() { + // Create the object that will be serialized. + final ValueFactory vf = SimpleValueFactory.getInstance(); + + final Set original = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:alice"), + vf.createIRI("urn:talksTo"), + vf.createIRI("urn:bob"), + vf.createIRI("urn:testGraph")), + vf.createStatement( + vf.createIRI("urn:bob"), + vf.createIRI("urn:talksTo"), + vf.createIRI("urn:charlie"), + vf.createIRI("urn:graph2")), + vf.createStatement( + vf.createIRI("urn:charlie"), + vf.createIRI("urn:talksTo"), + vf.createIRI("urn:bob"), + vf.createIRI("urn:graph2")), + vf.createStatement( + vf.createIRI("urn:alice"), + vf.createIRI("urn:listensTo"), + vf.createIRI("urn:charlie"), + vf.createIRI("urn:testGraph"))); + + // Serialize it. + try(final Serde> serde = new StatementsSerde()) { + final byte[] bytes = serde.serializer().serialize("topic", original); + + // Deserialize it. + final Set deserialized = serde.deserializer().deserialize("topic", bytes); + + // Show the deserialized value matches the original. + assertEquals(original, deserialized); + } + } + + @Test + public void deserializeEmptyData() { + try(final Serde> serde = new StatementsSerde()) { + assertNull( serde.deserializer().deserialize("topic", new byte[0]) ); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java new file mode 100644 index 000000000..e90042d00 --- /dev/null +++ b/extras/kafka.connect/api/src/test/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTaskTest.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static org.junit.Assert.assertEquals; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.connect.sink.SinkRecord; +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailConnection; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.memory.MemoryStore; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link RyaSinkTask}. + */ +public class RyaSinkTaskTest { + + /** + * A {@link RyaSinkTask} used to test against an in memory Sail instance. + */ + private static final class InMemoryRyaSinkTask extends RyaSinkTask { + + private Sail sail = null; + + @Override + protected void checkRyaInstanceExists(final Map taskConfig) throws IllegalStateException { + // Do nothing. Always assume the Rya Instance exists. + } + + @Override + protected Sail makeSail(final Map taskConfig) { + if(sail == null) { + sail = new MemoryStore(); + sail.initialize(); + } + return sail; + } + } + + @Test(expected = IllegalStateException.class) + public void start_ryaInstanceDoesNotExist() { + // Create the task that will be tested. + final RyaSinkTask task = new RyaSinkTask() { + @Override + protected void checkRyaInstanceExists(final Map taskConfig) throws IllegalStateException { + throw new IllegalStateException("It doesn't exist."); + } + + @Override + protected Sail makeSail(final Map taskConfig) { return null; } + }; + + // Since the rya instance does not exist, this will throw an exception. + task.start(new HashMap<>()); + } + + @Test + public void singleRecord() { + // Create the Statements that will be put by the task. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set statements = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Alice"), + vf.createIRI("urn:WorksAt"), + vf.createIRI("urn:Taco Shop"), + vf.createIRI("urn:graph1")), + vf.createStatement( + vf.createIRI("urn:Bob"), + vf.createIRI("urn:TalksTo"), + vf.createIRI("urn:Charlie"), + vf.createIRI("urn:graph2")), + vf.createStatement( + vf.createIRI("urn:Eve"), + vf.createIRI("urn:ListensTo"), + vf.createIRI("urn:Alice"), + vf.createIRI("urn:graph1"))); + + // Create the task that will be tested. + final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask(); + + // Setup the properties that will be used to configure the task. We don't actually need to set anything + // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store. + final Map props = new HashMap<>(); + + try { + // Start the task. + task.start(props); + + // Put the statements as a SinkRecord. + task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, statements, 0)) ); + + // Flush the statements. + task.flush(new HashMap<>()); + + // Fetch the stored Statements to show they match the original set. + final Set fetched = new HashSet<>(); + + final Sail sail = task.makeSail(props); + try(SailConnection conn = sail.getConnection(); + CloseableIteration it = conn.getStatements(null, null, null, false)) { + while(it.hasNext()) { + fetched.add( it.next() ); + } + } + + assertEquals(statements, fetched); + + } finally { + // Stop the task. + task.stop(); + } + } + + @Test + public void multipleRecords() { + // Create the Statements that will be put by the task. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set batch1 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Alice"), + vf.createIRI("urn:WorksAt"), + vf.createIRI("urn:Taco Shop"), + vf.createIRI("urn:graph1")), + vf.createStatement( + vf.createIRI("urn:Bob"), + vf.createIRI("urn:TalksTo"), + vf.createIRI("urn:Charlie"), + vf.createIRI("urn:graph2"))); + + final Set batch2 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Eve"), + vf.createIRI("urn:ListensTo"), + vf.createIRI("urn:Alice"), + vf.createIRI("urn:graph1"))); + + // Create the task that will be tested. + final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask(); + + // Setup the properties that will be used to configure the task. We don't actually need to set anything + // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store. + final Map props = new HashMap<>(); + + try { + // Start the task. + task.start(props); + + // Put the statements as SinkRecords. + final Collection records = Sets.newHashSet( + new SinkRecord("topic", 1, null, "key", null, batch1, 0), + new SinkRecord("topic", 1, null, "key", null, batch2, 1)); + task.put( records ); + + // Flush the statements. + task.flush(new HashMap<>()); + + // Fetch the stored Statements to show they match the original set. + final Set fetched = new HashSet<>(); + + final Sail sail = task.makeSail(props); + try(SailConnection conn = sail.getConnection(); + CloseableIteration it = conn.getStatements(null, null, null, false)) { + while(it.hasNext()) { + fetched.add( it.next() ); + } + } + + assertEquals(Sets.union(batch1, batch2), fetched); + + } finally { + // Stop the task. + task.stop(); + } + } + + @Test + public void flushBetweenPuts() { + // Create the Statements that will be put by the task. + final ValueFactory vf = SimpleValueFactory.getInstance(); + final Set batch1 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Alice"), + vf.createIRI("urn:WorksAt"), + vf.createIRI("urn:Taco Shop"), + vf.createIRI("urn:graph1")), + vf.createStatement( + vf.createIRI("urn:Bob"), + vf.createIRI("urn:TalksTo"), + vf.createIRI("urn:Charlie"), + vf.createIRI("urn:graph2"))); + + final Set batch2 = Sets.newHashSet( + vf.createStatement( + vf.createIRI("urn:Eve"), + vf.createIRI("urn:ListensTo"), + vf.createIRI("urn:Alice"), + vf.createIRI("urn:graph1"))); + + // Create the task that will be tested. + final InMemoryRyaSinkTask task = new InMemoryRyaSinkTask(); + + // Setup the properties that will be used to configure the task. We don't actually need to set anything + // here since we're always returning true for ryaInstanceExists(...) and use an in memory RDF store. + final Map props = new HashMap<>(); + + try { + // Start the task. + task.start(props); + + // Put the statements with flushes between them. + task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch1, 0)) ); + task.flush(new HashMap<>()); + task.put( Collections.singleton(new SinkRecord("topic", 1, null, "key", null, batch2, 1)) ); + task.flush(new HashMap<>()); + + // Fetch the stored Statements to show they match the original set. + final Set fetched = new HashSet<>(); + + final Sail sail = task.makeSail(props); + try(SailConnection conn = sail.getConnection(); + CloseableIteration it = conn.getStatements(null, null, null, false)) { + while(it.hasNext()) { + fetched.add( it.next() ); + } + } + + assertEquals(Sets.union(batch1, batch2), fetched); + + } finally { + // Stop the task. + task.stop(); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/api/src/test/resources/simplelogger.properties b/extras/kafka.connect/api/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..1b2131275 --- /dev/null +++ b/extras/kafka.connect/api/src/test/resources/simplelogger.properties @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.slf4j.simpleLogger.defaultLogLevel=debug diff --git a/extras/kafka.connect/client/README.md b/extras/kafka.connect/client/README.md new file mode 100644 index 000000000..c7b896316 --- /dev/null +++ b/extras/kafka.connect/client/README.md @@ -0,0 +1,21 @@ + + +This project creates a shaded executable jar that may be used to load and +read statements from a Kafka Topic in the format that the Rya Kafka Connect +Sinks expect. This tool is only meant to be used for testing/debugging Kafka +Connect integration. \ No newline at end of file diff --git a/extras/kafka.connect/client/pom.xml b/extras/kafka.connect/client/pom.xml new file mode 100644 index 000000000..1ffc8d63f --- /dev/null +++ b/extras/kafka.connect/client/pom.xml @@ -0,0 +1,113 @@ + + + + 4.0.0 + + + org.apache.rya + rya.kafka.connect.parent + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.client + + Apache Rya Kafka Connect - Client + Contains a client that may be used to load Statements into + a Kafka topic to be read by Kafka Connect. + + + + + org.apache.rya + rya.sail + + + org.apache.rya + rya.kafka.connect.api + + + + + org.eclipse.rdf4j + rdf4j-model + + + com.google.guava + guava + + + com.beust + jcommander + + + com.github.stephenc.findbugs + findbugs-annotations + + + org.apache.kafka + kafka-clients + + + + + junit + junit + test + + + org.mockito + mockito-all + test + + + org.slf4j + slf4j-simple + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + org.apache.rya.kafka.connect.client.CLIDriver + + + + + + + + + + diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java new file mode 100644 index 000000000..7ebf0832e --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException; +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand; +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format + * the Rya Kafka Connect Sinks expect. + */ +@DefaultAnnotation(NonNull.class) +public class CLIDriver { + + /** + * Maps from command strings to the object that performs the command. + */ + private static final ImmutableMap COMMANDS; + static { + final Set> commandClasses = new HashSet<>(); + commandClasses.add(ReadStatementsCommand.class); + commandClasses.add(WriteStatementsCommand.class); + final ImmutableMap.Builder builder = ImmutableMap.builder(); + for(final Class commandClass : commandClasses) { + try { + final RyaKafkaClientCommand command = commandClass.newInstance(); + builder.put(command.getCommand(), command); + } catch (InstantiationException | IllegalAccessException e) { + System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor."); + e.printStackTrace(); + } + } + COMMANDS = builder.build(); + } + + private static final String USAGE = makeUsage(COMMANDS); + + public static void main(final String[] args) { + // If no command provided or the command isn't recognized, then print the usage. + if (args.length == 0 || !COMMANDS.containsKey(args[0])) { + System.out.println(USAGE); + System.exit(1); + } + + // Fetch the command that will be executed. + final String command = args[0]; + final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); + final RyaKafkaClientCommand clientCommand = COMMANDS.get(command); + + // Print usage if the arguments are invalid for the command. + if(!clientCommand.validArguments(commandArgs)) { + System.out.println(clientCommand.getUsage()); + System.exit(1); + } + + // Execute the command. + try { + clientCommand.execute(commandArgs); + } catch (ArgumentsException | ExecutionException e) { + System.err.println("The command: " + command + " failed to execute properly."); + e.printStackTrace(); + System.exit(2); + } + } + + private static String makeUsage(final ImmutableMap commands) { + final StringBuilder usage = new StringBuilder(); + usage.append("Usage: ").append(CLIDriver.class.getSimpleName()).append(" ( ... )\n"); + usage.append("\n"); + usage.append("Possible Commands:\n"); + + // Sort and find the max width of the commands. + final List sortedCommandNames = Lists.newArrayList(commands.keySet()); + Collections.sort(sortedCommandNames); + + int maxCommandLength = 0; + for (final String commandName : sortedCommandNames) { + maxCommandLength = commandName.length() > maxCommandLength ? commandName.length() : maxCommandLength; + } + + // Add each command to the usage. + final String commandFormat = " %-" + maxCommandLength + "s - %s\n"; + for (final String commandName : sortedCommandNames) { + final String commandDescription = commands.get(commandName).getDescription(); + usage.append(String.format(commandFormat, commandName, commandDescription)); + } + + return usage.toString(); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java new file mode 100644 index 000000000..8a69a0781 --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/RyaKafkaClientCommand.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A command that may be executed by the Rya Kafka Connect Client {@link CLIDriver}. + */ +@DefaultAnnotation(NonNull.class) +public interface RyaKafkaClientCommand { + + /** + * Command line parameters that are used by all commands that interact with Kafka. + */ + class KafkaParameters { + + @Parameter(names = { "--bootstrapServers", "-b" }, description = + "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.") + public String bootstrapServers = "localhost:9092"; + + @Parameter(names = { "--topic", "-t" }, required = true, description = "The Kafka topic that will be interacted with.") + public String topic; + } + + /** + * @return What a user would type into the command line to indicate + * they want to execute this command. + */ + public String getCommand(); + + /** + * @return Briefly describes what the command does. + */ + public String getDescription(); + + /** + * @return Describes what arguments may be provided to the command. + */ + default public String getUsage() { + final JCommander parser = new JCommander(new KafkaParameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + /** + * Validates a set of arguments that may be passed into the command. + * + * @param args - The arguments that will be validated. (not null) + * @return {@code true} if the arguments are valid, otherwise {@code false}. + */ + public boolean validArguments(String[] args); + + /** + * Execute the command using the command line arguments. + * + * @param args - Command line arguments that configure how the command will execute. (not null) + * @throws ArgumentsException there was a problem with the provided arguments. + * @throws ExecutionException There was a problem while executing the command. + */ + public void execute(final String[] args) throws ArgumentsException, ExecutionException; + + /** + * A {@link RyaKafkaClientCommand} could not be executed because of a problem with + * the arguments that were provided to it. + */ + public static final class ArgumentsException extends Exception { + private static final long serialVersionUID = 1L; + + public ArgumentsException(final String message) { + super(message); + } + + public ArgumentsException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * A {@link RyaKafkaClientCommand} could not be executed. + */ + public static final class ExecutionException extends Exception { + private static final long serialVersionUID = 1L; + + public ExecutionException(final String message) { + super(message); + } + + public ExecutionException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java new file mode 100644 index 000000000..bf7a647dd --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/ReadStatementsCommand.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.util.Collections; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.kafka.connect.api.StatementsDeserializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Reads {@link Statement}s from a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class ReadStatementsCommand implements RyaKafkaClientCommand { + + @Override + public String getCommand() { + return "read"; + } + + @Override + public String getDescription() { + return "Reads Statements from the specified Kafka topic."; + } + + @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new KafkaParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final KafkaParameters params = new KafkaParameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not read the Statements from the topic because of invalid command line parameters.", e); + } + + // Set up the consumer. + try(KafkaConsumer> consumer = makeConsumer(params)) { + // Subscribe to the configured topic. + consumer.subscribe(Collections.singleton(params.topic)); + + // Read the statements and write them to output. + for(final ConsumerRecord> record : consumer.poll(500)) { + for(final Statement stmt: record.value()) { + System.out.println( stmt ); + } + } + } + } + + private KafkaConsumer> makeConsumer(final KafkaParameters params) { + requireNonNull(params); + + // Configure which instance of Kafka to connect to. + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers); + + // Nothing meaningful is in the key and the values is a Set object. + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StatementsDeserializer.class); + + // Use a UUID for the Group Id so that we never register as part of the same group as another consumer. + final String groupId = UUID.randomUUID().toString(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + // Set a client id so that server side logging can be traced. + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Kafka-Connect-Client-" + groupId); + + // These consumers always start at the beginning and move forwards until the caller is finished with + // the returned stream, so never commit the consumer's progress. + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + return new KafkaConsumer<>(props); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java new file mode 100644 index 000000000..83311f52c --- /dev/null +++ b/extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.kafka.connect.api.StatementsSerializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class WriteStatementsCommand implements RyaKafkaClientCommand { + private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class); + + /** + * Command line parameters that are used by this command to configure itself. + */ + public static class WriteParameters extends KafkaParameters { + @Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.") + public String statementsFile; + } + + @Override + public String getCommand() { + return "write"; + } + + @Override + public String getDescription() { + return "Writes Statements to the specified Kafka topic."; + } + + @Override + public boolean validArguments(final String[] args) { + boolean valid = true; + try { + new JCommander(new WriteParameters(), args); + } catch(final ParameterException e) { + valid = false; + } + return valid; + } + + /** + * @return Describes what arguments may be provided to the command. + */ + @Override + public String getUsage() { + final JCommander parser = new JCommander(new WriteParameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public void execute(final String[] args) throws ArgumentsException, ExecutionException { + requireNonNull(args); + + // Parse the command line arguments. + final WriteParameters params = new WriteParameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e); + } + + // Verify the configured statements file path. + final Path statementsPath = Paths.get(params.statementsFile); + if(!statementsPath.toFile().exists()) { + throw new ArgumentsException("Could not load statements at path '" + statementsPath + "' because that " + + "file does not exist. Make sure you've entered the correct path."); + } + + // Create an RDF Parser whose format is derived from the statementPath's file extension. + final String filename = statementsPath.getFileName().toString(); + final RDFFormat format = RdfFormatUtils.forFileName(filename); + if (format == null) { + throw new UnsupportedRDFormatException("Unknown RDF format for the file: " + filename); + } + final RDFParser parser = Rio.createParser(format); + + // Set up the producer. + try(Producer> producer = makeProducer(params)) { + // Set a handler that writes the statements to the specified kafka topic. It writes batches of 5 Statements. + parser.setRDFHandler(new AbstractRDFHandler() { + + private Set batch = new HashSet<>(5); + + @Override + public void startRDF() throws RDFHandlerException { + log.trace("Starting loading statements."); + } + + @Override + public void handleStatement(final Statement stmnt) throws RDFHandlerException { + log.trace("Adding statement."); + batch.add(stmnt); + + if(batch.size() == 5) { + flushBatch(); + } + } + + @Override + public void endRDF() throws RDFHandlerException { + if(!batch.isEmpty()) { + flushBatch(); + } + log.trace("Done."); + } + + private void flushBatch() { + log.trace("Flushing batch of size " + batch.size()); + producer.send(new ProducerRecord<>(params.topic, null, batch)); + batch = new HashSet<>(5); + producer.flush(); + } + }); + + // Do the parse and load. + try { + parser.parse(Files.newInputStream(statementsPath), ""); + } catch (RDFParseException | RDFHandlerException | IOException e) { + throw new ExecutionException("Could not load the RDF file's Statements into the Kafka topic.", e); + } + } + } + + private static Producer> makeProducer(final KafkaParameters params) { + requireNonNull(params); + final Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, params.bootstrapServers); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StatementsSerializer.class.getName()); + return new KafkaProducer<>(props); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/client/src/main/resources/log4j.properties b/extras/kafka.connect/client/src/main/resources/log4j.properties new file mode 100644 index 000000000..b07468cde --- /dev/null +++ b/extras/kafka.connect/client/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/extras/kafka.connect/mongo-it/README.md b/extras/kafka.connect/mongo-it/README.md new file mode 100644 index 000000000..b154b951e --- /dev/null +++ b/extras/kafka.connect/mongo-it/README.md @@ -0,0 +1,19 @@ + + +This project contains integration tests that verify a Mongo DB backed +implementation of the Rya Kafka Connect Sink is working properly. \ No newline at end of file diff --git a/extras/kafka.connect/mongo-it/pom.xml b/extras/kafka.connect/mongo-it/pom.xml new file mode 100644 index 000000000..ca439ea52 --- /dev/null +++ b/extras/kafka.connect/mongo-it/pom.xml @@ -0,0 +1,62 @@ + + + + 4.0.0 + + + org.apache.rya + rya.kafka.connect.parent + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.mongo.it + + Apache Rya Kafka Connect - Mongo DB Integration Tests + Tests the Kafka Connect Sink that writes to a Rya instance backed by Mongo DB. + + + + + org.apache.rya + rya.kafka.connect.mongo + + + + + org.apache.kafka + connect-api + provided + + + + + junit + junit + test + + + org.apache.rya + rya.test.mongo + test + + + \ No newline at end of file diff --git a/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java new file mode 100644 index 000000000..55e7603b9 --- /dev/null +++ b/extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.test.mongo.MongoITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link MongoRyaSinkTask}. + */ +public class MongoRyaSinkTaskIT extends MongoITBase { + + @Test + public void instanceExists() throws Exception { + // Install an instance of Rya. + final String ryaInstanceName = "rya"; + final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( + super.getMongoHostname(), + super.getMongoPort(), + Optional.empty(), + Optional.empty()); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(false) + .setEnableGeoIndex(false) + .build(); + + final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient()); + ryaClient.getInstall().install(ryaInstanceName, installConfig); + + // Create the task that will be tested. + final MongoRyaSinkTask task = new MongoRyaSinkTask(); + + try { + // Configure the task to use the embedded Mongo DB instance for Rya. + final Map config = new HashMap<>(); + config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); + config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); + config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + + // This will pass because the Rya instance exists. + task.start(config); + } finally { + task.stop(); + } + } + + @Test(expected = ConnectException.class) + public void instanceDoesNotExist() throws Exception { + // Create the task that will be tested. + final MongoRyaSinkTask task = new MongoRyaSinkTask(); + + try { + // Configure the task to use the embedded Mongo DB instance for Rya. + final Map config = new HashMap<>(); + config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); + config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); + config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist"); + + // Starting the task will fail because the Rya instance does not exist. + task.start(config); + } finally { + task.stop(); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/mongo/README.md b/extras/kafka.connect/mongo/README.md new file mode 100644 index 000000000..03b2c9b96 --- /dev/null +++ b/extras/kafka.connect/mongo/README.md @@ -0,0 +1,23 @@ + + +This project is the Rya Kafka Connect Sink that writes to Mongo DB backed +instances of Rya. + +This project produces a shaded jar that may be installed into Kafka Connect. +For more information about how to install and configure this connector, see +[the manual](../../rya.manual/src/site/markdown/kafka-connect-integration.md). \ No newline at end of file diff --git a/extras/kafka.connect/mongo/pom.xml b/extras/kafka.connect/mongo/pom.xml new file mode 100644 index 000000000..66eba1b9d --- /dev/null +++ b/extras/kafka.connect/mongo/pom.xml @@ -0,0 +1,79 @@ + + + + 4.0.0 + + + org.apache.rya + rya.kafka.connect.parent + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.mongo + + Apache Rya Kafka Connect - Mongo DB + A Kafka Connect Sink that writes to a Rya instance backed by Mongo DB. + + + + + org.apache.rya + rya.kafka.connect.api + + + org.apache.rya + rya.indexing + + + + + org.apache.kafka + connect-api + provided + + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + + + \ No newline at end of file diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java new file mode 100644 index 000000000..3b48556ba --- /dev/null +++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConfig extends RyaSinkConfig { + + public static final String HOSTNAME = "mongo.hostname"; + private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections will use."; + + public static final String PORT = "mongo.port"; + private static final String PORT_DOC = "The Mongo DB port the Sail connections will use."; + + public static final String USERNAME = "mongo.username"; + private static final String USERNAME_DOC = "The Mongo DB username the Sail connections will use."; + + public static final String PASSWORD = "mongo.password"; + private static final String PASSWORD_DOC = "The Mongo DB password the Sail connections will use."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(HOSTNAME, Type.STRING, Importance.HIGH, HOSTNAME_DOC) + .define(PORT, Type.INT, Importance.HIGH, PORT_DOC) + .define(USERNAME, Type.STRING, "", Importance.HIGH, USERNAME_DOC) + .define(PASSWORD, Type.PASSWORD, "", Importance.HIGH, PASSWORD_DOC); + static { + RyaSinkConfig.addCommonDefinitions(CONFIG_DEF); + } + + /** + * Constructs an instance of {@link MongoRyaSinkConfig}. + * + * @param originals - The key/value pairs that define the configuration. (not null) + */ + public MongoRyaSinkConfig(final Map originals) { + super(CONFIG_DEF, originals); + } + + /** + * @return The Mongo DB hostname the Sail connections wlll use. + */ + public String getHostname() { + return super.getString(HOSTNAME); + } + + /** + * @return The Mongo DB port the Sail connections will use. + */ + public int getPort() { + return super.getInt(PORT); + } + + /** + * @return The Mongo DB username the Sail connections will use. + */ + public String getUsername() { + return super.getString(USERNAME); + } + + /** + * @return The Mongo DB password the Sail connections will use. + */ + public String getPassword() { + return super.getPassword(PASSWORD).value(); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java new file mode 100644 index 000000000..fd91d07e0 --- /dev/null +++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConnector extends RyaSinkConnector { + + @Nullable + private MongoRyaSinkConfig config = null; + + @Override + public void start(final Map props) { + this.config = new MongoRyaSinkConfig( props ); + } + + @Override + protected AbstractConfig getConfig() { + if(config == null) { + throw new IllegalStateException("The configuration has not been set yet. Invoke start(Map) first."); + } + return config; + } + + @Override + public Class taskClass() { + return MongoRyaSinkTask.class; + } + + @Override + public ConfigDef config() { + return MongoRyaSinkConfig.CONFIG_DEF; + } +} \ No newline at end of file diff --git a/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java new file mode 100644 index 000000000..6887fdb75 --- /dev/null +++ b/extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.log.LogUtils; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + + @Override + protected void checkRyaInstanceExists(final Map taskConfig) throws IllegalStateException { + requireNonNull(taskConfig); + + // Parse the configuration object. + final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); + @Nullable + final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); + @Nullable + final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + + // Connect a Mongo Client to the configured Mongo DB instance. + final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); + final boolean hasCredentials = username != null && password != null; + + try(MongoClient mongoClient = hasCredentials ? + new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : + new MongoClient(serverAddr)) { + // Use a RyaClient to see if the configured instance exists. + // Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. + final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( + config.getHostname(), + config.getPort(), + Optional.ofNullable(username), + Optional.ofNullable(password)); + + final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); + if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { + throw new ConnectException("The Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has not been installed."); + } + } catch(final RyaClientException e) { + throw new ConnectException("Unable to determine if the Rya Instance named " + + LogUtils.clean(config.getRyaInstanceName()) + " has been installed.", e); + } + } + + @Override + protected Sail makeSail(final Map taskConfig) { + requireNonNull(taskConfig); + + // Parse the configuration object. + final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); + + // Move the configuration into a Rya Configuration object. + final MongoDBRdfConfiguration ryaConfig = new MongoDBRdfConfiguration(); + ConfigUtils.setUseMongo(ryaConfig, true); + ryaConfig.setMongoDBName( config.getRyaInstanceName() ); + ryaConfig.setTablePrefix( config.getRyaInstanceName() ); + ryaConfig.setMongoHostname( config.getHostname() ); + ryaConfig.setMongoPort( "" + config.getPort() ); + + if(!Strings.isNullOrEmpty(config.getUsername()) && !Strings.isNullOrEmpty(config.getPassword())) { + ryaConfig.setMongoUser( config.getUsername() ); + ryaConfig.setMongoPassword( config.getPassword() ); + } + + // Create the Sail object. + try { + return RyaSailFactory.getInstance(ryaConfig); + } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new ConnectException("Could not connect to the Rya Instance named " + config.getRyaInstanceName(), e); + } + } +} \ No newline at end of file diff --git a/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java new file mode 100644 index 000000000..d6c7c9683 --- /dev/null +++ b/extras/kafka.connect/mongo/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfigTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link MongoRyaSinkConfig}. + */ +public class MongoRyaSinkConfigTest { + + @Test + public void parses() { + final Map properties = new HashMap<>(); + properties.put(MongoRyaSinkConfig.HOSTNAME, "127.0.0.1"); + properties.put(MongoRyaSinkConfig.PORT, "27017"); + properties.put(MongoRyaSinkConfig.USERNAME, "alice"); + properties.put(MongoRyaSinkConfig.PASSWORD, "alice1234!@"); + properties.put(RyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + new MongoRyaSinkConfig(properties); + } +} \ No newline at end of file diff --git a/extras/kafka.connect/pom.xml b/extras/kafka.connect/pom.xml new file mode 100644 index 000000000..9a9702cc0 --- /dev/null +++ b/extras/kafka.connect/pom.xml @@ -0,0 +1,66 @@ + + + + 4.0.0 + + + org.apache.rya + rya.extras + 4.0.0-incubating-SNAPSHOT + + + rya.kafka.connect.parent + + Apache Rya Kafka Connect Parent + The parent pom file for any Rya Kafka Connect project. + + pom + + + api + accumulo + accumulo-it + mongo + mongo-it + client + + + + 1.1.0 + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + ${project.version} + + + + + + + \ No newline at end of file diff --git a/extras/pom.xml b/extras/pom.xml index bb6f914c9..65dd4cca1 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -45,6 +45,7 @@ under the License. rya.merger rya.streams rya.forwardchain + kafka.connect diff --git a/extras/rya.manual/src/site/markdown/_index.md b/extras/rya.manual/src/site/markdown/_index.md index d0b61c432..07dfe5035 100644 --- a/extras/rya.manual/src/site/markdown/_index.md +++ b/extras/rya.manual/src/site/markdown/_index.md @@ -33,6 +33,7 @@ This project contains documentation about Apache Rya, a scalable RDF triple stor - [Inferencing](infer.md) - [MapReduce Interface](mapreduce.md) - [Rya Streams](rya-streams.md) +- [Kafka Connect Integration](kafka-connect-integration.md) # Samples - [Typical First Steps](sm-firststeps.md) diff --git a/extras/rya.manual/src/site/markdown/index.md b/extras/rya.manual/src/site/markdown/index.md index 537261894..e686736bb 100644 --- a/extras/rya.manual/src/site/markdown/index.md +++ b/extras/rya.manual/src/site/markdown/index.md @@ -35,6 +35,7 @@ This project contains documentation about Apache Rya, a scalable RDF triple stor - [Shell Interface](shell.md) - [Incremental Join Maintenance Application (PCJ Updater)](pcj-updater.md) - [Rya Streams](rya-streams.md) +- [Kafka Connect Integration](kafka-connect-integration.md) # Samples - [Typical First Steps](sm-firststeps.md) diff --git a/extras/rya.manual/src/site/markdown/kafka-connect-integration.md b/extras/rya.manual/src/site/markdown/kafka-connect-integration.md new file mode 100644 index 000000000..ed31c3306 --- /dev/null +++ b/extras/rya.manual/src/site/markdown/kafka-connect-integration.md @@ -0,0 +1,493 @@ + + +# Kafka Connect Integration # + +Introduced in 4.0.0 + +# Table of Contents # +- [Introduction](#introduction) +- [An Important Note About Deploying the Plugins](#an-important-note-about-deploying-the-plugins) +- [Statement Serialization Format](#statement-serialization-format) +- [Quick Start](#quick-start) +- [Future Work](#future-work) + +

+ +## Introduction ## + +[Kafka Connect](https://kafka.apache.org/documentation/#connect) is a system +that is able to pull data from data sources into Kafka topics as well as write +data from Kafka topics into data sinks. This project implements a Kafka +Connector Sink for both Accumulo backed and Mongo backed instances of Rya. + +
+ +## An Important Note About Deploying the Plugins ## + +While testing the application with both the Mongo Rya Sink and Accumulo Rya Sink +uber jars installed, we were seeing ClassCastExceptions being thrown when some +code was trying to cast a ContextStatement into a Statement. Normally, this +wouldn't cause a problem. However, within Connect, this was caused by both uber +jars containing a copy of the ContextStatement and Statement classes. Different +Classloaders loaded each of those classes and the relationship between them +was lost. + +For now, it's important that you only deploy one of the uber jars at a time. + +
+ +## Statement Serialization Format ## + +Applications that would like to write to a Kafka topic using the format that +the sink is able to recognize must write ```Set``` objects by using the +[StatementsSerializer](../../../../../extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java). + +Rather than using the Confluent Schema Registry and Avro to serialize Statements, +we're going with RDF4J's Rio Binary Format. You may read more about how that +format is implemented [here](http://docs.rdf4j.org/rdf4j-binary/). + +
+ +## Quick Start ## + +This tutorial demonstrates how to install and start the Accumulo Rya Sink and +the Mongo Rya Sink by using the Open Source version of the Confluent platform. +You can download it [here](https://www.confluent.io/download/). We're going to +use the standalone version of Connect, but in a production environment you may +want to use the distributed mode if there is a lot of data that needs to be +inserted. + +We suggest you go through the +[Confluent Platform Open Source Quick Start](https://docs.confluent.io/current/quickstart/cos-quickstart.html), +so that you can ensure the Confluent platform is installed and ready for use. +We're using Confluent 4.1.0 in this tutorial, so be aware some things may change +when using newer versions of the platform. You may also find it beneficial to +go through the [Kafka Connect Quick Start](https://docs.confluent.io/current/connect/quickstart.html) +as well to see how Kafka Connects works in general. + +### Step 1: Download the applications ### + +You can fetch the artifacts you need to follow this Quick Start from our +[downloads page](http://rya.apache.org/download/). Click on the release of +interest and follow the "Central repository for Maven and other dependency +managers" URL. + +Fetch the following four artifacts: + +Artifact Id | Type +--- | --- +rya.shell | shaded jar +rya.kafka.connect.client | shaded jar +rya.kafka.connect.accumulo | shaded jar +rya.kafka.connect.mongo | shaded jar + +### Step 2: Load statements into a kafka topic ### + +The sink connector that we will be demonstrating reads Statements from +a Kafka topic and loads them into an instance of Rya. Just to keep things simple, +lets create a topic that only has a single partition and is unreplicated. Within +a production environment, you will want to tune these values based on how many +concurrent workers you would like to use when processing input. + +``` +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic statements +``` + +Next we need to create a file that contains the statements we will load into the topic. +Name the file "quickstart-statements.nt" and use a text editor to write the following lines to it: + +``` + . + . + . + . + . + . +``` + +Use the ```rya.kafka.connect.client``` to write the file's contents to the topic we just made. + +``` +java -jar rya.kafka.connect.client-4.0.0-incubating-shaded.jar write -f quickstart-statements.nt -t statements +``` + +You may verify the statements were written by using the read command. + +``` +java -jar rya.kafka.connect.client-4.0.0-incubating-shaded.jar read -t statements +``` +At this point you need to decide whether you are going to use an Accumulo or +MongoDB backed instance of Rya. The following steps are pretty much the same +for both backends, but they require different jars and commands. To follow +the Accumulo set of steps, start with __Accumulo Step 3__ and go through +__Accumulo Step 5__. To follow the Mongo set of steps, then just skip ahead +to __Mongo Step 3__ and go through __Mongo Step 5__. + +### Accumulo Step 3: Installing a Rya instance ### + +The sink needs a place to put the Statements that we just wrote to the kafka topic. +We're going to have it write to a Rya instance named "quickstart" on your Accumulo +cluster. To do this, you'll need to use the Rya Shell. Here's roughly what an +installation session should look like. + +``` +java -jar rya.shell-4.0.0-incubating-shaded.jar + + _____ _____ _ _ _ +| __ \ / ____| | | | | +| |__) | _ __ _ | (___ | |__ ___| | | +| _ / | | |/ _` | \___ \| '_ \ / _ \ | | +| | \ \ |_| | (_| | ____) | | | | __/ | | +|_| \_\__, |\__,_| |_____/|_| |_|\___|_|_| + __/ | + |___/ +4.0.0-incubating + +Welcome to the Rya Shell. + +Execute one of the connect commands to start interacting with an instance of Rya. +You may press tab at any time to see which of the commands are available. + +rya>connect-accumulo --zookeepers localhost --instanceName quickstart_instance --username quickstart +Password: ******* + +rya/quickstart_instance> install +Rya Instance Name [default: rya_]: quickstart +Use Shard Balancing (improves streamed input write speeds) [default: false]: f +Use Entity Centric Indexing [default: true]: f +Use Free Text Indexing [default: true]: f +Use Temporal Indexing [default: true]: f +Use Precomputed Join Indexing [default: true]: f + +A Rya instance will be installed using the following values: + Instance Name: quickstart + Use Shard Balancing: false + Use Entity Centric Indexing: false + Use Free Text Indexing: false + Use Temporal Indexing: false + Use Precomputed Join Indexing: false + +Continue with the install? (y/n) y +The Rya instance named 'quickstart' has been installed. + +``` +We also want to ensure the instance we just installed does not have any Statements +yet. We can do this using that same shell instance. + +``` +# 2. Verify no data has been inserted yet. +rya/quickstart_instance> connect-rya --instance quickstart +rya/quickstart_instance:quickstart> sparql-query +Enter a SPARQL Query. +Type '\e' to execute the current query. +Type '\c' to clear the current query. +SPARQL> select * where { ?s ?p ?o .}\e +Executing Query... +Query Results: + +rya/quickstart_instance:quickstart> +``` + +### Accumulo Step 4: Installing and running the Accumulo Rya Sink ### + +At this point we have a kafka topic that is filled with RDF Statements that +need to be loaded into Rya. We also have a Rya instance for them to be written +to. All that is left is to install the Accumulo Rya Sink, configure it to +use those two endpoints, and then load it. + +The version of the Confluent platform we used for this quick start doesn't seem +to be able to find new connector installs dynamically, so start by shutting +everything down. + +``` +confluent stop +``` + +Install the shaded jar that contains the Accumulo Rya Sink connector. + +``` +mkdir confluent-4.1.0/share/java/kafka-connect-rya-accumulo +cp rya.kafka.connect.accumulo-4.0.0-incubating-shaded.jar confluent-4.1.0/share/java/kafka-connect-rya-accumulo +``` + +Then we need to configure the connector to read from the "statements" topic, +specify which Accumulo cluster is hosting the Rya Instance, which Rya Instance +to write to, and specify the classes that define the Connector and the Converter +to use. This file has clear text passwords in it, so ensure it has appropriate +access restrictions applied to it. + +``` +touch confluent-4.1.0/etc/kafka/connect-rya-accumulo-sink.properties +``` + +And then use your favorite text editor to fill in the following values: + +``` +name=rya-accumulo-sink +connector.class=org.apache.rya.kafka.connect.accumulo.AccumuloRyaSinkConnector +tasks.max=1 +value.converter=org.apache.rya.kafka.connect.api.StatementsConverter +topics=statements +accumulo.zookeepers=127.0.0.1 +accumulo.cluster.name= +accumulo.username= +accumulo.password= +rya.instance.name=quickstart +``` + +Start the Confluent platform: + +``` +confluent start +``` + +Even after the start command says everything is started, it may take a moment for +the load command to work. Rerun this command until you get a response from the +REST service printed to the screen and confluent reports it as loaded: + +``` +confluent load rya-accumulo-sink -d confluent-4.1.0/etc/kafka/connect-rya-accumulo-sink.properties +``` + +The connector will automatically start workers that load the data from the +configured topic into the configured Rya instance. + +### Accumulo Step 5: Verify statements were written to Rya ### + +At this point you should be able to rerun the query from __Accumulo Step 3__ and +see that Statements have been added to the Rya instance. + +``` +rya/quickstart_instance:quickstart> sparql-query +Enter a SPARQL Query. +Type '\e' to execute the current query. +Type '\c' to clear the current query. +SPARQL> select * where { ?s ?p ?o . }\e +Executing Query... +Query Results: +p,s,o +urn:talksTo,urn:Alice,urn:Bob +urn:talksTo,urn:Bob,urn:Alice +urn:talksTo,urn:Bob,urn:Charlie +urn:talksTo,urn:Charlie,urn:Alice +urn:talksTo,urn:David,urn:Eve +urn:listensTo,urn:Eve,urn:Bob +Done. +``` + +### Mongo Step 3: Installing a Rya instance ### + +The sink needs a place to put the Statements that we just wrote to the kafka topic. +We're going to have it write to a Rya instance named "quickstart" within your +Mongo database. To do this, you'll need to use the Rya Shell. Here's roughly +what an installation session should look like. + +``` +[root@localhost ~]# java -jar rya.shell-4.0.0-incubating-SNAPSHOT-shaded.jar + _____ _____ _ _ _ +| __ \ / ____| | | | | +| |__) | _ __ _ | (___ | |__ ___| | | +| _ / | | |/ _` | \___ \| '_ \ / _ \ | | +| | \ \ |_| | (_| | ____) | | | | __/ | | +|_| \_\__, |\__,_| |_____/|_| |_|\___|_|_| + __/ | + |___/ +4.0.0-incubating-SNAPSHOT + +Welcome to the Rya Shell. + +Execute one of the connect commands to start interacting with an instance of Rya. +You may press tab at any time to see which of the commands are available. + +rya> connect-mongo --hostname localhost --port 27017 +Connected. You must select a Rya instance to interact with next. + +rya/localhost> install +Rya Instance Name [default: rya_]: quickstart +Use Free Text Indexing [default: true]: f +Use Temporal Indexing [default: true]: f +Use PCJ Indexing [default: true]: f + +A Rya instance will be installed using the following values: + Instance Name: quickstart + Use Free Text Indexing: false + Use Temporal Indexing: false + Use PCJ Indexing: false + +Continue with the install? (y/n) y +The Rya instance named 'quickstart' has been installed. +``` +We also want to ensure the instance we just installed does not have any Statements +yet. We can do this using that same shell instance. + +``` +rya/localhost> connect-rya --instance quickstart +rya/localhost:quickstart> select * where { ?s ?p ?o .}\e +Command 'select * where { ?s ?p ?o .}\e' not found (for assistance press TAB) +rya/localhost:quickstart> sparql-query +Enter a SPARQL Query. +Type '\e' to execute the current query. +Type '\c' to clear the current query. +SPARQL> select * where { ?s ?p ?o .}\e +Executing Query... +No Results Found. +Done. +rya/localhost:quickstart> +``` + +### Mongo Step 4: Installing and running the Mongo Rya Sink ### + +At this point we have a kafka topic that is filled with RDF Statements that +need to be loaded into Rya. We also have a Rya instance for them to be written +to. All that is left is to install the Mongo Rya Sink, configure it to +use those two endpoints, and then load it. + +The version of the Confluent platform we used for this quick start doesn't seem +to be able to find new connector installs dynamically, so start but shutting +everything down. + +``` +confluent stop +``` + +Install the shaded jar that contains the Mongo Rya Sink connector. + +``` +mkdir confluent-4.1.0/share/java/kafka-connect-rya-mongo +cp rya.kafka.connect.mongo-4.0.0-incubating-shaded.jar confluent-4.1.0/share/java/kafka-connect-rya-mongo +``` + +Then we need to configure the connector to read from the "statements" topic, +specify which Mongo database is hosting the Rya Instance, which Rya Instance +to write to, and specify the classes that define the Connector and the Converter +to use. This file has clear text passwords in it, so ensure it has appropriate +access restrictions applied to it. + +``` +touch confluent-4.1.0/etc/kafka/connect-rya-mongo-sink.properties +``` + +And then use your favorite text editor to fill in the following values: + +``` +name=rya-mongo-sink +connector.class=org.apache.rya.kafka.connect.mongo.MongoRyaSinkConnector +tasks.max=1 +value.converter=org.apache.rya.kafka.connect.api.StatementsConverter +topics=statements +mongo.hostname=127.0.0.1 +mongo.port=27017 +mongo.username= +mongo.password= +rya.instance.name=quickstart +``` + +Start the Confluent platform: + +``` +confluent start +``` + +Even after the start command says everything is started, it may take a moment for +the load command to work. Rerun this command until you get a response from the +REST service printed to the screen and confluent reports it as loaded: + +``` +confluent load rya-mongo-sink -d confluent-4.1.0/etc/kafka/connect-rya-mongo-sink.properties +``` + +The connector will automatically start workers that load the data from the +configured topic into the configured Rya instance. + +### Mongo Step 5: Verify statements were written to Rya ### + +At this point you should be able to rerun the query from __Mongo Step 3__ and +see that statements have been added to the Rya instance. + +``` +rya/localhost:quickstart> sparql-query +Enter a SPARQL Query. +Type '\e' to execute the current query. +Type '\c' to clear the current query. +SPARQL> select * where { ?s ?p ?o . }\e +Executing Query... +Query Results: +p,s,o +urn:talksTo,urn:Alice,urn:Bob +urn:talksTo,urn:Bob,urn:Charlie +urn:talksTo,urn:Charlie,urn:Alice +urn:talksTo,urn:Bob,urn:Alice +urn:talksTo,urn:David,urn:Eve +urn:listensTo,urn:Eve,urn:Bob +Done. +``` + +
+ +## Future Work ## + +### Remove passwords from connector configuration files ### + +It's a security flaw that the connector's passwords for connecting to Accumulo +and Mongo are in clear text within the configuration files. The log files hide +them when they log the configuration, but it's still written to standard out +when the use the confluent command to load the connector. There should be +another way for the connector to receive the credentials required to connect. + +### Support both Mongo and Accumulo connectors at the same time ### + +Currently, you can only use the Mongo Rya Sink or the Accumulo Rya Sink because +of the problem mentioned in +[An Important Note About Deploying the Plugins](#an-important-note-about-deploying-the-plugins). + +We could have a single uber jar plugin that supports both backends. We could also +just not use uber jars and include all of the jars that the plugins depend on +in a single folder. + +### Visibility Statement Support ### + +The sinks are able to write Statement objects, but that means none of those +statements are allowed to have visibility expressions. If they do, then the +visibilities will be dropped when the statement is inserted. + +It would be nice if the Rya implementation of the RDF4J RepositoryConnection +were able to figure out when a Statement is actually a VisibilityStatement. It +could then retain the visibility expression when it is persisted. + +### Visibility Binding Set Sink ### + +The Visibility Binding Sets that are stored within the Precomputed Join index +could be generated by an external system (such as Rya Streams) and written to +a Kafka topic. It would be convenient to also have a connector that is able to +write those values to the index. + +### Rya Kafka Connect Source ### + +Rya Streams would benefit from having a Kafka Connect Source that is able to +determine when new Statements have been added to the core tables, and then write +those Visibility Statements to a Kafka topic. + +### Geo Indexing ### + +It's difficult to get the Geo indexing into the Sail object that represents +Rya because the geo project is optional. While optional, we don't use dependency +injection to get the GeoRyaSailFactory into the application instead of the +normal Rya Sail Factory. An improvement to this project would be to resolve +that problem so that it may do geo indexing while inserting statements. \ No newline at end of file diff --git a/extras/rya.manual/src/site/site.xml b/extras/rya.manual/src/site/site.xml index 2af3373e1..a102902d0 100644 --- a/extras/rya.manual/src/site/site.xml +++ b/extras/rya.manual/src/site/site.xml @@ -48,7 +48,8 @@ under the License. - + + diff --git a/pom.xml b/pom.xml index 699fccf7d..b46110f27 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,7 @@ under the License. 1.0-1 0.10.0.1 3.0.3 + 1.1 true @@ -554,6 +555,21 @@ under the License. rya.test.kafka ${project.version} + + org.apache.rya + rya.kafka.connect.api + ${project.version} + + + org.apache.rya + rya.kafka.connect.accumulo + ${project.version} + + + org.apache.rya + rya.kafka.connect.mongo + ${project.version} + org.apache.accumulo accumulo-core @@ -564,11 +580,6 @@ under the License. accumulo-start ${accumulo.version} - - org.eclipse.rdf4j - rdf4j-runtime-osgi - ${org.eclipse.rdf4j.version} - org.eclipse.rdf4j rdf4j-runtime @@ -604,6 +615,22 @@ under the License. rdf4j-queryresultio-text ${org.eclipse.rdf4j.version} + + + org.eclipse.rdf4j + rdf4j-rio-api + ${org.eclipse.rdf4j.version} + + + org.eclipse.rdf4j + rdf4j-rio-binary + ${org.eclipse.rdf4j.version} + + + org.eclipse.rdf4j + rdf4j-rio-datatypes + ${org.eclipse.rdf4j.version} + org.eclipse.rdf4j rdf4j-rio-nquads @@ -707,6 +734,11 @@ under the License. slf4j-api ${slf4j.version} + + org.slf4j + slf4j-simple + ${slf4j.version} + org.slf4j slf4j-log4j12 @@ -1073,6 +1105,11 @@ under the License. + + org.apache.kafka + connect-api + ${kafka.version} + org.apache.kafka kafka_2.11 @@ -1084,6 +1121,11 @@ under the License. kryo ${kryo.version} + + com.jcabi + jcabi-manifests + ${jcabi-manifeses.version} +