From cbb1947542d4aad9e006ccebe66239c53bf3e519 Mon Sep 17 00:00:00 2001 From: "David W. Lotts" Date: Tue, 1 Nov 2016 17:37:07 -0400 Subject: [PATCH 1/2] RYA-128 closes RYA-128 trigger service to Kafka. --- .../rya/indexing/pcj/fluo/api/CreatePcj.java | 18 +- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 48 ++- .../app/export/rya/BindingSetSerializer.java | 137 +++++++++ .../app/export/rya/KafkaExportParameters.java | 84 +++++ .../app/export/rya/KafkaResultExporter.java | 75 +++++ .../rya/KafkaResultExporterFactory.java | 64 ++++ .../app/observers/QueryResultObserver.java | 14 +- .../export/rya/KafkaExportParametersTest.java | 97 ++++++ .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 34 ++ .../apache/rya/indexing/pcj/fluo/ITBase.java | 30 +- .../pcj/fluo/integration/KafkaExportIT.java | 290 ++++++++++++++++++ extras/rya.prospector/pom.xml | 39 +++ 12 files changed, 905 insertions(+), 25 deletions(-) create mode 100644 extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java create mode 100644 extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java create mode 100644 extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java create mode 100644 extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java create mode 100644 extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java create mode 100644 extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 656737163..d29191dc4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -132,12 +132,12 @@ public CreatePcj(final int spInsertBatchSize) { * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}. * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}. */ - public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, - final Connector accumulo, String ryaInstance ) - throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException { - requireNonNull(pcjId); - requireNonNull(pcjStorage); - requireNonNull(fluo); + public String withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, + final Connector accumulo, String ryaInstance ) + throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); requireNonNull(accumulo); requireNonNull(ryaInstance); @@ -162,13 +162,16 @@ public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + // return queryId to the caller for later monitoring from the export. + String queryId = null; + try (Transaction tx = fluo.newTransaction()) { // Write the query's structure to Fluo. new FluoQueryMetadataDAO().write(tx, fluoQuery); // The results of the query are eventually exported to an instance // of Rya, so store the Rya ID for the PCJ. - final String queryId = fluoQuery.getQueryMetadata().getNodeId(); + queryId = fluoQuery.getQueryMetadata().getNodeId(); tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); @@ -206,6 +209,7 @@ public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage writeBatch(fluo, triplesBatch); triplesBatch.clear(); } + return queryId; } diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index fd2e582ca..de510084f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -33,7 +33,9 @@ under the License. A Fluo implementation of Rya Precomputed Join Indexing. This module produces a jar that may be executed by the 'fluo' command line tool as a YARN job. - + + 3.0.3 + @@ -62,6 +64,50 @@ under the License. + + + org.apache.kafka + kafka-clients + 0.10.1.0 + + + org.apache.kafka + kafka_2.11 + 0.10.1.0 + + + slf4j-log4j12 + org.slf4j + + + + + com.esotericsoftware + kryo + ${kryo.version} + + + + + org.apache.kafka + kafka-clients + 0.10.1.0 + test + + + org.apache.kafka + kafka_2.11 + 0.10.1.0 + test + + + + slf4j-log4j12 + org.slf4j + + + + junit diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java new file mode 100644 index 000000000..7b35fecc0 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.ListBindingSet; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class BindingSetSerializer implements Serializer, Deserializer { + private static final ThreadLocal kryos = new ThreadLocal() { + @Override + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + return kryo; + }; + }; + + @Override + public VisibilityBindingSet deserialize(String topic, byte[] data) { + KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + Input input = new Input(new ByteArrayInputStream(data)); + return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class); + // this is an alternative, or perhaps replace it: + // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); + } + + @Override + public void configure(Map configs, boolean isKey) { + // Do nothing. + } + + @Override + public byte[] serialize(String topic, VisibilityBindingSet data) { + KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + internalSerializer.write(kryos.get(), output, data); + output.flush(); + byte[] array = baos.toByteArray(); + return array; + // this is an alternative, or perhaps replace it: + // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); + } + + @Override + public void close() { + // Do nothing. + } + + private static Value makeValue(final String valueString, final URI typeURI) { + // Convert the String Value into a Value. + final ValueFactory valueFactory = ValueFactoryImpl.getInstance(); + if (typeURI.equals(XMLSchema.ANYURI)) { + return valueFactory.createURI(valueString); + } else { + return valueFactory.createLiteral(valueString, typeURI); + } + } + + /** + * De/Serialize a visibility binding set using the Kryo library. + * TODO rename this KryoSomething and change the package. + * + */ + private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer { + private static final Logger log = Logger.getLogger(BindingSetSerializer.class); + @Override + public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) { + log.debug("Serializer writing visBindingSet" + visBindingSet); + output.writeString(visBindingSet.getVisibility()); + // write the number count for the reader. + output.writeInt(visBindingSet.size()); + for (Binding binding : visBindingSet) { + output.writeString(binding.getName()); + final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue()); + final String valueString = ryaValue.getData(); + final URI type = ryaValue.getDataType(); + output.writeString(valueString); + output.writeString(type.toString()); + } + } + + @Override + public VisibilityBindingSet read(Kryo kryo, Input input, Class aClass) { + log.debug("Serializer reading visBindingSet"); + String visibility = input.readString(); + int bindingCount = input.readInt(); + ArrayList namesList = new ArrayList(bindingCount); + ArrayList valuesList = new ArrayList(bindingCount); + for (int i = bindingCount; i > 0; i--) { + namesList.add(input.readString()); + String valueString = input.readString(); + final URI type = new URIImpl(input.readString()); + valuesList.add(makeValue(valueString, type)); + } + BindingSet bindingSet = new ListBindingSet(namesList, valuesList); + return new VisibilityBindingSet(bindingSet, visibility); + } + } +} diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java new file mode 100644 index 000000000..3dbb1d8b2 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import java.util.Map; +import java.util.Properties; + +import org.apache.fluo.api.observer.Observer; +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; + +/** + * Provides read/write functions to the parameters map that is passed into an + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related + * to PCJ exporting to a kafka topic. + * Remember: if doesn't count unless it is added to params + */ + +public class KafkaExportParameters extends ParametersBase { + + public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled"; + + public KafkaExportParameters(final Map params) { + super(params); + } + + /** + * @param isExportToKafka + * - {@code True} if the Fluo application should export + * to Kafka; otherwise {@code false}. + */ + public void setExportToKafka(final boolean isExportToKafka) { + setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka); + } + + /** + * @return {@code True} if the Fluo application should export to Kafka; otherwise + * {@code false}. Defaults to {@code false} if no value is present. + */ + public boolean isExportToKafka() { + return getBoolean(params, CONF_EXPORT_TO_KAFKA, false); + } + + /** + * Add the properties to the params, NOT keeping them separate from the other params. + * Guaranteed by Properties: Each key and its corresponding value in the property list is a string. + * + * @param producerConfig + */ + public void setProducerConfig(final Properties producerConfig) { + for (Object key : producerConfig.keySet().toArray()) { + Object value = producerConfig.getProperty(key.toString()); + this.params.put(key.toString(), value.toString()); + } + } + + /** + * @return all the params (not just kafka producer Configuration) as a {@link Properties} + */ + public Properties getProducerConfig() { + Properties props = new Properties(); + for (Object key : params.keySet().toArray()) { + Object value = params.get(key.toString()); + props.put(key.toString(), value.toString()); + } + return props; + } + +} \ No newline at end of file diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java new file mode 100644 index 000000000..362efa7d5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * Incrementally exports SPARQL query results to Kafka topics. + */ +public class KafkaResultExporter implements IncrementalResultExporter { + private final KafkaProducer producer; + private static final Logger log = Logger.getLogger(KafkaResultExporter.class); + + /** + * Constructs an instance given a Kafka producer. + * + * @param producer + * for sending result set alerts to a broker. (not null) + * created and configured by {@link KafkaResultExporterFactory} + */ + public KafkaResultExporter(KafkaProducer producer) { + super(); + checkNotNull(producer, "Producer is required."); + this.producer = producer; + } + + /** + * Send the results to the topic using the queryID as the topicname + */ + @Override + public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { + checkNotNull(fluoTx); + checkNotNull(queryId); + checkNotNull(result); + try { + final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; + log.info(msg); + + // Send result on topic + ProducerRecord rec = new ProducerRecord(/* topicname= */ queryId, /* value= */ result); + // Can add a key if you need to: + // ProducerRecord(String topic, K key, V value) + producer.send(rec); + log.debug("producer.send(rec) completed"); + + } catch (final Throwable e) { + throw new ResultExportException("A result could not be exported to Kafka.", e); + } + } +} diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java new file mode 100644 index 000000000..9418720d2 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import org.apache.fluo.api.observer.Observer.Context; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import com.google.common.base.Optional; + +/** + * Creates instances of {@link KafkaResultExporter}. + *

+ * Configure a Kafka producer by adding several required Key/values as described here: + * http://kafka.apache.org/documentation.html#producerconfigs + *

+ * Here is a simple example: + *

+ *     Properties producerConfig = new Properties();
+ *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ * 
+ * + * @see ProducerConfig + */ +public class KafkaResultExporterFactory implements IncrementalResultExporterFactory { + private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class); + @Override + public Optional build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap()); + log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); + if (exportParams.isExportToKafka()) { + // Setup Kafka connection + KafkaProducer producer = new KafkaProducer(exportParams.getProducerConfig()); + // Create the exporter + final IncrementalResultExporter exporter = new KafkaResultExporter(producer); + return Optional.of(exporter); + } else { + return Optional.absent(); + } + } + +} diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index bbca12829..a8fc6d922 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -23,11 +23,17 @@ import java.util.HashMap; import java.util.Map; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; @@ -38,11 +44,6 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.observer.AbstractObserver; -import org.apache.rya.accumulo.utils.VisibilitySimplifier; /** * Performs incremental result exporting to the configured destinations. @@ -69,6 +70,7 @@ public class QueryResultObserver extends AbstractObserver { private static final ImmutableSet factories = ImmutableSet.builder() .add(new RyaResultExporterFactory()) + .add(new KafkaResultExporterFactory()) .build(); /** @@ -90,6 +92,8 @@ public void init(final Context context) { for(final IncrementalResultExporterFactory builder : factories) { try { + log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + final Optional exporter = builder.build(context); if(exporter.isPresent()) { exportersBuilder.add(exporter.get()); diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java new file mode 100644 index 000000000..1e5adbff6 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export.rya; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.junit.Test; + +/** + * Tests the methods of {@link KafkaExportParameters}. + */ +public class KafkaExportParametersTest { + + @Test + public void writeParams() { + final Map params = new HashMap<>(); + + // Load some values into the params using the wrapper. + final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + kafkaParams.setExportToKafka(true); + + // Ensure the params map has the expected values. + final Map expectedParams = new HashMap<>(); + expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true"); + assertTrue(kafkaParams.isExportToKafka()); + assertEquals(expectedParams, params); + + // now go the other way. + expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false"); + kafkaParams.setExportToKafka(false); + assertFalse(kafkaParams.isExportToKafka()); + assertEquals(expectedParams, params); + } + @Test + public void writeParamsProps() { + final String key1 = "key1"; + final String value1First = "value1-preserve-this"; + final String value1Second = "value1prop"; + final String key2 = "歌古事学週文原問業間革社。"; // http://generator.lorem-ipsum.info/_chinese + final String value2 = "良治鮮猿性社費著併病極験。"; + + final Map params = new HashMap<>(); + // Make sure export key1 is NOT kept separate from producer config key1 + // This is a change, originally they were kept separate. + params.put(key1, value1First); + final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + // Load some values into the properties using the wrapper. + Properties props = new Properties(); + props.put(key1, value1Second); + props.put(key2, value2); + kafkaParams.setProducerConfig(props); + Properties propsAfter = kafkaParams.getProducerConfig(); + assertEquals(props, propsAfter); + assertEquals(params, params); + assertEquals("Should change identical parameters key", params.get(key1), value1Second); + assertEquals("Props should have params's key", propsAfter.get(key1), value1Second); + assertNotNull("Should have props key", params.get(key2)); + } + + @Test + public void notConfigured() { + final Map params = new HashMap<>(); + + // Ensure an unconfigured parameters map will say kafka export is disabled. + final KafkaExportParameters kafkaParams = new KafkaExportParameters(params); + assertFalse(kafkaParams.isExportToKafka()); + } + + @Test + public void testKafkaResultExporterFactory() { + KafkaResultExporterFactory factory = new KafkaResultExporterFactory(); + assertNotNull(factory); + } +} \ No newline at end of file diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index 6bb7105ca..b7adad6e9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -56,5 +56,39 @@ org.apache.fluo fluo-api
+ + + org.apache.kafka + kafka-clients + 0.10.1.0 + + + org.apache.kafka + kafka_2.11 + 0.10.1.0 + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.kafka + kafka_2.11 + 0.10.1.0 + test + + + + slf4j-log4j12 + org.slf4j + + + + + +
\ No newline at end of file diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java index 293426fc7..fa9a10e67 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -390,11 +390,6 @@ protected static AccumuloRdfConfiguration makeConfig(final String instanceName, return conf; } - /** - * Setup a Mini Fluo cluster that uses a temporary directory to store its data. - * - * @return A Mini Fluo cluster. - */ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { // Setup the observers that will be used by the Fluo PCJ Application. final List observers = new ArrayList<>(); @@ -403,14 +398,9 @@ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExis observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); + // Set export details for exporting from Fluo to a Rya repository and a subscriber queue. final HashMap exportParams = new HashMap<>(); - final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); - ryaParams.setExportToRya(true); - ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); - ryaParams.setAccumuloInstanceName(instanceName); - ryaParams.setZookeeperServers(zookeepers); - ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); - ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); + setExportParameters(exportParams); // Configure the export observer to export new PCJ results to the mini accumulo cluster. final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); @@ -433,4 +423,20 @@ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExis FluoFactory.newAdmin(config).initialize(new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) ); return FluoFactory.newMiniFluo(config); } + + /** + * Set export details for exporting from Fluo to a Rya repository and a subscriber queue. + * Override this if you have custom export destinations. + * + * @param exportParams + */ + protected void setExportParameters(final HashMap exportParams) { + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); + ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); + } } diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java new file mode 100644 index 000000000..10d2530ac --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.BindingImpl; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +/** + * Performs integration tests over the Fluo application geared towards Kafka PCJ exporting. + *

+ * These tests might be ignored so that they will not run as unit tests while building the application. + * Run this test from Maven command line: + * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration + * $ mvn surefire:test -Dtest=KafkaExportIT + */ +public class KafkaExportIT extends ITBase { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "testTopic"; + private ZkUtils zkUtils; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + + + /** + * setup mini kafka and call the super to setup mini fluo + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources() + */ + @Override + public void setupMiniResources() throws Exception { + super.setupMiniResources(); + + zkServer = new EmbeddedZookeeper(); + String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + System.out.println("setup kafka and fluo."); + } + + /** + * Test kafka without rya code to make sure kafka works in this environment. + * If this test fails then its a testing environment issue, not with Rya. + * Source: https://github.com/asmaier/mini-kafka + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void embeddedKafkaTest() throws InterruptedException, IOException { + + // create topic + AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + // setup producer + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + KafkaProducer producer = new KafkaProducer(producerProps); + + // setup consumer + Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty("group.id", "group0"); + consumerProps.setProperty("client.id", "consumer0"); + consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TOPIC)); + + // send message + ProducerRecord data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); + producer.send(data); + producer.close(); + + // starting consumer + ConsumerRecords records = consumer.poll(3000); + assertEquals(1, records.count()); + Iterator> recordIterator = records.iterator(); + ConsumerRecord record = recordIterator.next(); + System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); + assertEquals(42, (int) record.key()); + assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + consumer.close(); + } + + @Test + public void newResultsExportedTest() throws Exception { + final String sparql = "SELECT ?customer ?worker ?city " + "{ " + "FILTER(?customer = ) " + "FILTER(?city = ) " + "?customer ?worker. " + "?worker ?city. " + "?worker . " + "}"; + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set streamedTriples = Sets.newHashSet(makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), makeRyaStatement("http://Bob", "http://livesIn", "http://London"), makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Alice", "http://talksTo", "http://David"), makeRyaStatement("http://David", "http://livesIn", "http://London"), makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), makeRyaStatement("http://Frank", "http://livesIn", "http://London"), makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); + + // The expected results of the SPARQL query once the PCJ has been computed. + final Set expected = new HashSet<>(); + expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), new BindingImpl("city", new URIImpl("http://London")))); + expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Charlie")), new BindingImpl("city", new URIImpl("http://London")))); + expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London")))); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + CreatePcj createPcj = new CreatePcj(); + String QueryIdIsTopicName = createPcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional. absent()); + + // Fetch the exported results from Accumulo once the observers finish working. + fluo.waitForObservers(); + + /// KafkaConsumer consumer = makeConsumer(QueryIdIsTopicName); + KafkaConsumer consumer = makeConsumer(QueryIdIsTopicName); + + // starting consumer polling for messages + /// ConsumerRecords records = consumer.poll(3000); + ConsumerRecords records = consumer.poll(3000); + /// Iterator> recordIterator = records.iterator(); + Iterator> recordIterator = records.iterator(); + boolean allExpected = true; + ConsumerRecord unexpectedRecord = null; + while (recordIterator.hasNext()) { + ConsumerRecord record = recordIterator.next(); + System.out.printf("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString()); + boolean expectedThis = expected.contains(record.value()); + if (!expectedThis) { + System.out.println("This consumed record is not expected."); + unexpectedRecord = record; + } + allExpected = allExpected && expectedThis; + } + assertTrue("Must consume expected record: not expected:" + unexpectedRecord, allExpected); + assertNotEquals("Should get some results", 0, records.count()); + // assertEquals(42, (int) record.key()); + // assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + + } + + /** + * A helper function for creating a {@link BindingSet} from an array of + * {@link Binding}s. + * + * @param bindings + * - The bindings to include in the set. (not null) + * @return A {@link BindingSet} holding the bindings. + */ + protected static BindingSet makeBindingSet(final Binding... bindings) { + return new VisibilityBindingSet(ITBase.makeBindingSet(bindings)); + } + + /** + * @param TopicName + * @return + */ + protected KafkaConsumer makeConsumer(String TopicName) { + // setup consumer + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + // "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // to make sure the consumer starts from the beginning of the topic + /// KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TopicName)); + return consumer; + } + + /** + * Add info about the kafka queue/topic to receive the export. + * Call super to get the Rya parameters. + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap) + */ + @Override + protected void setExportParameters(HashMap exportParams) { + // Get the defaults + super.setExportParameters(exportParams); + // Add the kafka parameters + final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); + kafkaParams.setExportToKafka(true); + // Configure the Producer + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + // "org.apache.kafka.common.serialization.StringSerializer"); + kafkaParams.setProducerConfig(producerConfig); + } + + /** + * Close all the Kafka mini server and mini-zookeeper + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() + */ + @Override + public void shutdownMiniResources() { + super.shutdownMiniResources(); + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } +} \ No newline at end of file diff --git a/extras/rya.prospector/pom.xml b/extras/rya.prospector/pom.xml index 0a3b6cfd9..952ab947b 100644 --- a/extras/rya.prospector/pom.xml +++ b/extras/rya.prospector/pom.xml @@ -75,6 +75,45 @@ under the License. + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.2,) + + compile + testCompile + + + + + + + + + org.codehaus.groovy + groovy-eclipse-compiler + [2.9.1-01,) + + add-groovy-build-paths + + + + + + + + + + From f2fbedd3f54142bd2463558cc91f03f29a1abc0e Mon Sep 17 00:00:00 2001 From: "David W. Lotts" Date: Tue, 4 Apr 2017 17:14:21 -0400 Subject: [PATCH 2/2] RYA-128 Review issues fixed. --- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 1 - .../{rya => kafka}/KafkaExportParameters.java | 8 ++-- .../{rya => kafka}/KafkaResultExporter.java | 6 +-- .../KafkaResultExporterFactory.java | 4 +- .../KryoVisibilityBindingSetSerializer.java} | 43 +++++++++++++++---- .../app/observers/QueryResultObserver.java | 2 +- .../export/rya/KafkaExportParametersTest.java | 6 ++- .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 1 - .../pcj/fluo/integration/KafkaExportIT.java | 20 +++++---- pom.xml | 18 ++++++++ 10 files changed, 79 insertions(+), 30 deletions(-) rename extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/{rya => kafka}/KafkaExportParameters.java (93%) rename extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/{rya => kafka}/KafkaResultExporter.java (94%) rename extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/{rya => kafka}/KafkaResultExporterFactory.java (97%) rename extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/{rya/BindingSetSerializer.java => kafka/KryoVisibilityBindingSetSerializer.java} (78%) diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index de510084f..343713c8c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -99,7 +99,6 @@ under the License. kafka_2.11 0.10.1.0 test - slf4j-log4j12 diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java similarity index 93% rename from extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java rename to extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java index 3dbb1d8b2..347a2e281 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import java.util.Map; import java.util.Properties; @@ -62,7 +62,7 @@ public boolean isExportToKafka() { * * @param producerConfig */ - public void setProducerConfig(final Properties producerConfig) { + public void addAllProducerConfig(final Properties producerConfig) { for (Object key : producerConfig.keySet().toArray()) { Object value = producerConfig.getProperty(key.toString()); this.params.put(key.toString(), value.toString()); @@ -70,9 +70,11 @@ public void setProducerConfig(final Properties producerConfig) { } /** + * Collect all the properties + * * @return all the params (not just kafka producer Configuration) as a {@link Properties} */ - public Properties getProducerConfig() { + public Properties listAllConfig() { Properties props = new Properties(); for (Object key : params.keySet().toArray()) { Object value = params.get(key.toString()); diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java similarity index 94% rename from extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java rename to extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java index 362efa7d5..c40c5da96 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import static com.google.common.base.Preconditions.checkNotNull; @@ -40,7 +40,7 @@ public class KafkaResultExporter implements IncrementalResultExporter { * * @param producer * for sending result set alerts to a broker. (not null) - * created and configured by {@link KafkaResultExporterFactory} + * Can be created and configured by {@link KafkaResultExporterFactory} */ public KafkaResultExporter(KafkaProducer producer) { super(); @@ -59,7 +59,7 @@ public void export(final TransactionBase fluoTx, final String queryId, final Vis try { final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; - log.info(msg); + log.trace(msg); // Send result on topic ProducerRecord rec = new ProducerRecord(/* topicname= */ queryId, /* value= */ result); diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java similarity index 97% rename from extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java rename to extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java index 9418720d2..995e9d9c5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import org.apache.fluo.api.observer.Observer.Context; import org.apache.kafka.clients.producer.KafkaProducer; @@ -52,7 +52,7 @@ public Optional build(Context context) throws Increme log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka()); if (exportParams.isExportToKafka()) { // Setup Kafka connection - KafkaProducer producer = new KafkaProducer(exportParams.getProducerConfig()); + KafkaProducer producer = new KafkaProducer(exportParams.listAllConfig()); // Create the exporter final IncrementalResultExporter exporter = new KafkaResultExporter(producer); return Optional.of(exporter); diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java similarity index 78% rename from extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java rename to extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java index 7b35fecc0..d12233a8c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.rya.indexing.pcj.fluo.app.export.rya; +package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -43,7 +43,11 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -public class BindingSetSerializer implements Serializer, Deserializer { +/** + * Serialize and deserialize a VisibilityBindingSet using Kyro lib. Great for exporting results of queries. + * + */ +public class KryoVisibilityBindingSetSerializer implements Serializer, Deserializer { private static final ThreadLocal kryos = new ThreadLocal() { @Override protected Kryo initialValue() { @@ -52,20 +56,43 @@ protected Kryo initialValue() { }; }; + /** + * Deserialize a VisibilityBindingSet using Kyro lib. Exporting results of queries. + * If you don't want to use Kyro, here is an alternative: + * return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); + * + * @param topic + * ignored + * @param data + * serialized bytes + * @return deserialized instance of VisibilityBindingSet + */ @Override public VisibilityBindingSet deserialize(String topic, byte[] data) { KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); Input input = new Input(new ByteArrayInputStream(data)); return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class); - // this is an alternative, or perhaps replace it: - // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); } + /** + * Ignored. Nothing to configure. + */ @Override public void configure(Map configs, boolean isKey) { // Do nothing. } + /** + * Serialize a VisibilityBindingSet using Kyro lib. Exporting results of queries. + * If you don't want to use Kyro, here is an alternative: + * return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); + * + * @param topic + * ignored + * @param data + * serialize this instance + * @return Serialized form of VisibilityBindingSet + */ @Override public byte[] serialize(String topic, VisibilityBindingSet data) { KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); @@ -75,10 +102,11 @@ public byte[] serialize(String topic, VisibilityBindingSet data) { output.flush(); byte[] array = baos.toByteArray(); return array; - // this is an alternative, or perhaps replace it: - // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); } + /** + * Ignored. Nothing to close. + */ @Override public void close() { // Do nothing. @@ -96,11 +124,10 @@ private static Value makeValue(final String valueString, final URI typeURI) { /** * De/Serialize a visibility binding set using the Kryo library. - * TODO rename this KryoSomething and change the package. * */ private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer { - private static final Logger log = Logger.getLogger(BindingSetSerializer.class); + private static final Logger log = Logger.getLogger(KryoVisibilityBindingSetSerializer.class); @Override public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) { log.debug("Serializer writing visBindingSet" + visBindingSet); diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index a8fc6d922..1238c1810 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -33,7 +33,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java index 1e5adbff6..74193cfbf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Properties; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; import org.junit.Test; /** @@ -71,8 +73,8 @@ public void writeParamsProps() { Properties props = new Properties(); props.put(key1, value1Second); props.put(key2, value2); - kafkaParams.setProducerConfig(props); - Properties propsAfter = kafkaParams.getProducerConfig(); + kafkaParams.addAllProducerConfig(props); + Properties propsAfter = kafkaParams.listAllConfig(); assertEquals(props, propsAfter); assertEquals(params, params); assertEquals("Should change identical parameters key", params.get(key1), value1Second); diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index b7adad6e9..ab99ecdd7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -79,7 +79,6 @@ kafka_2.11 0.10.1.0 test - slf4j-log4j12 diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index 10d2530ac..5e12fac1f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -33,6 +33,8 @@ import java.util.Set; import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -44,7 +46,7 @@ import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -77,6 +79,7 @@ * $ mvn surefire:test -Dtest=KafkaExportIT */ public class KafkaExportIT extends ITBase { + private static final Log logger = LogFactory.getLog(KafkaExportIT.class); private static final String ZKHOST = "127.0.0.1"; private static final String BROKERHOST = "127.0.0.1"; @@ -112,7 +115,7 @@ public void setupMiniResources() throws Exception { Time mock = new MockTime(); kafkaServer = TestUtils.createServer(config, mock); - System.out.println("setup kafka and fluo."); + logger.trace("setup kafka and fluo."); } /** @@ -125,7 +128,6 @@ public void setupMiniResources() throws Exception { */ @Test public void embeddedKafkaTest() throws InterruptedException, IOException { - // create topic AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); @@ -157,7 +159,7 @@ public void embeddedKafkaTest() throws InterruptedException, IOException { assertEquals(1, records.count()); Iterator> recordIterator = records.iterator(); ConsumerRecord record = recordIterator.next(); - System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); + logger.trace(String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value())); assertEquals(42, (int) record.key()); assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); consumer.close(); @@ -206,10 +208,10 @@ public void newResultsExportedTest() throws Exception { ConsumerRecord unexpectedRecord = null; while (recordIterator.hasNext()) { ConsumerRecord record = recordIterator.next(); - System.out.printf("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString()); + logger.trace(String.format("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString())); boolean expectedThis = expected.contains(record.value()); if (!expectedThis) { - System.out.println("This consumed record is not expected."); + logger.trace("This consumed record is not expected."); unexpectedRecord = record; } allExpected = allExpected && expectedThis; @@ -244,7 +246,7 @@ protected KafkaConsumer makeConsumer(String Topic consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); // "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // to make sure the consumer starts from the beginning of the topic /// KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); @@ -270,9 +272,9 @@ protected void setExportParameters(HashMap exportParams) { Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); // "org.apache.kafka.common.serialization.StringSerializer"); - kafkaParams.setProducerConfig(producerConfig); + kafkaParams.addAllProducerConfig(producerConfig); } /** diff --git a/pom.xml b/pom.xml index e635e2586..3fa35bac8 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ under the License. 1.3.9-1 1.0-1 3.0.4 + 0.10.1.0 @@ -666,6 +667,23 @@ under the License. + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.apache.kafka + kafka_2.11 + ${kafka.version} + + + slf4j-log4j12 + org.slf4j + + +