From 36467a027fa95ba6587ada721155e1d0479e1808 Mon Sep 17 00:00:00 2001 From: Raul Kripalani Date: Thu, 3 Sep 2015 23:31:08 +0100 Subject: [PATCH] IGNITE-1370 Streamers: Implement multiple tuple extractor. --- .../apache/ignite/stream/StreamAdapter.java | 48 +++++++- .../stream/StreamMultipleTupleExtractor.java | 38 +++++++ .../ignite/stream/StreamTupleExtractor.java | 5 + .../ignite/stream/socket/SocketStreamer.java | 3 +- .../stream/socket/SocketStreamerSelfTest.java | 104 ++++++++++++++---- 5 files changed, 171 insertions(+), 27 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java index 97edcbb2df2f1..ffa0821d168ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.stream; import java.util.Map; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteDataStreamer; @@ -26,11 +27,22 @@ * streaming from different data sources. The purpose of adapters is to * convert different message formats into Ignite stream key-value tuples * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}. + *

+ * Two types of tuple extractors are supported: + *

    + *
  1. A single tuple extractor, which extracts either no or 1 tuple out of a message. See + * see {@link #setTupleExtractor(StreamTupleExtractor)}.
  2. + *
  3. A multiple tuple extractor, which is capable of extracting multiple tuples out of a single message, in the + * form of a {@link Map}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.
  4. + *
*/ public abstract class StreamAdapter { /** Tuple extractor. */ private StreamTupleExtractor extractor; + /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */ + private StreamMultipleTupleExtractor multipleTupleExtractor; + /** Streamer. */ private IgniteDataStreamer stmr; @@ -83,6 +95,20 @@ public void setTupleExtractor(StreamTupleExtractor extractor) { this.extractor = extractor; } + /** + * @return Provided tuple extractor (for 1:n cardinality). + */ + public StreamMultipleTupleExtractor getMultipleTupleExtractor() { + return multipleTupleExtractor; + } + + /** + * @param multipleTupleExtractor Extractor for 1:n tuple extraction. + */ + public void setMultipleTupleExtractor(StreamMultipleTupleExtractor multipleTupleExtractor) { + this.multipleTupleExtractor = multipleTupleExtractor; + } + /** * @return Provided {@link Ignite} instance. */ @@ -98,14 +124,28 @@ public void setIgnite(Ignite ignite) { } /** - * Converts given message to a tuple and adds it to the underlying streamer. + * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the + * underlying streamer. + *

+ * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence + * and the latter will be ignored. * * @param msg Message to convert. */ protected void addMessage(T msg) { - Map.Entry e = extractor.extract(msg); + if (multipleTupleExtractor == null) { + Map.Entry e = extractor.extract(msg); + + if (e != null) + stmr.addData(e); - if (e != null) - stmr.addData(e); + } else { + Map m = multipleTupleExtractor.extract(msg); + + if (m != null) + stmr.addData(m); + + } } + } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java new file mode 100644 index 0000000000000..71ad45a2f789c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import java.util.Map; + +/** + * Stream tuple extractor to convert a single message to zero, one or many tuples. + *

+ * For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}. + * + * @see StreamTupleExtractor + */ +public interface StreamMultipleTupleExtractor { + + /** + * Extracts a set of key-values from a message. + * + * @param msg Message. + * @return Map containing resulting tuples. + */ + public Map extract(T msg); +} \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java index b6150abd5a1ab..aed7d8ab32839 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java @@ -21,6 +21,11 @@ /** * Stream tuple extractor to convert messages to Ignite key-value tuples. + *

+ * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may + * produce more than one tuple. + * + * @see StreamMultipleTupleExtractor */ public interface StreamTupleExtractor { /** diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java index 0d27af961f724..c89952dbdff65 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java @@ -141,7 +141,8 @@ public void setConverter(SocketMessageConverter converter) { * @throws IgniteException If failed. */ public void start() { - A.notNull(getTupleExtractor(), "tupleExtractor"); + A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null, + "tupleExtractor (single or multiple)"); A.notNull(getStreamer(), "streamer"); A.notNull(getIgnite(), "ignite"); A.ensure(threads > 0, "threads > 0"); diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java index 185599d4b80c6..8b05754ce297c 100644 --- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -43,6 +44,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.stream.StreamTupleExtractor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -111,7 +113,7 @@ public void testSizeBasedDefaultConverter() throws Exception { Marshaller marsh = new JdkMarshaller(); for (int i = 0; i < CNT; i++) { - byte[] msg = marsh.marshal(new Tuple(i)); + byte[] msg = marsh.marshal(new Message(i)); os.write(msg.length >>> 24); os.write(msg.length >>> 16); @@ -125,21 +127,52 @@ public void testSizeBasedDefaultConverter() throws Exception { throw new IgniteException(e); } } - }); + }, true); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleEntriesFromOneMessage() throws Exception { + test(null, null, new Runnable() { + @Override public void run() { + try (Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + int[] values = new int[CNT]; + for (int i = 0; i < CNT; i++) { + values[i] = i; + } + + byte[] msg = marsh.marshal(new Message(values)); + + os.write(msg.length >>> 24); + os.write(msg.length >>> 16); + os.write(msg.length >>> 8); + os.write(msg.length); + + os.write(msg); + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, false); } /** * @throws Exception If failed. */ public void testSizeBasedCustomConverter() throws Exception { - SocketMessageConverter converter = new SocketMessageConverter() { - @Override public Tuple convert(byte[] msg) { + SocketMessageConverter converter = new SocketMessageConverter() { + @Override public Message convert(byte[] msg) { int i = (msg[0] & 0xFF) << 24; i |= (msg[1] & 0xFF) << 16; i |= (msg[2] & 0xFF) << 8; i |= msg[3] & 0xFF; - return new Tuple(i); + return new Message(i); } }; @@ -164,7 +197,7 @@ public void testSizeBasedCustomConverter() throws Exception { throw new IgniteException(e); } } - }); + }, true); } /** @@ -178,7 +211,7 @@ public void testDelimiterBasedDefaultConverter() throws Exception { Marshaller marsh = new JdkMarshaller(); for (int i = 0; i < CNT; i++) { - byte[] msg = marsh.marshal(new Tuple(i)); + byte[] msg = marsh.marshal(new Message(i)); os.write(msg); os.write(DELIM); @@ -188,7 +221,7 @@ public void testDelimiterBasedDefaultConverter() throws Exception { throw new IgniteException(e); } } - }); + }, true); } @@ -196,14 +229,14 @@ public void testDelimiterBasedDefaultConverter() throws Exception { * @throws Exception If failed. */ public void testDelimiterBasedCustomConverter() throws Exception { - SocketMessageConverter converter = new SocketMessageConverter() { - @Override public Tuple convert(byte[] msg) { + SocketMessageConverter converter = new SocketMessageConverter() { + @Override public Message convert(byte[] msg) { int i = (msg[0] & 0xFF) << 24; i |= (msg[1] & 0xFF) << 16; i |= (msg[2] & 0xFF) << 8; i |= msg[3] & 0xFF; - return new Tuple(i); + return new Message(i); } }; @@ -225,16 +258,17 @@ public void testDelimiterBasedCustomConverter() throws Exception { throw new IgniteException(e); } } - }); + }, true); } /** * @param converter Converter. * @param r Runnable.. */ - private void test(@Nullable SocketMessageConverter converter, @Nullable byte[] delim, Runnable r) throws Exception + private void test(@Nullable SocketMessageConverter converter, @Nullable byte[] delim, Runnable r, + boolean oneMessagePerTuple) throws Exception { - SocketStreamer sockStmr = null; + SocketStreamer sockStmr = null; Ignite ignite = grid(0); @@ -257,11 +291,24 @@ private void test(@Nullable SocketMessageConverter converter, @Nullable b sockStmr.setDelimiter(delim); - sockStmr.setTupleExtractor(new StreamTupleExtractor() { - @Override public Map.Entry extract(Tuple msg) { - return new IgniteBiTuple<>(msg.key, msg.val); - } - }); + if (oneMessagePerTuple) { + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(Message msg) { + return new IgniteBiTuple<>(msg.key, msg.val); + } + }); + } + else { + sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor() { + @Override public Map extract(Message msg) { + Map answer = new HashMap<>(); + for (int value : msg.values) { + answer.put(value, Integer.toString(value)); + } + return answer; + } + }); + } if (converter != null) sockStmr.setConverter(converter); @@ -297,9 +344,9 @@ private void test(@Nullable SocketMessageConverter converter, @Nullable b } /** - * Tuple. + * Message. */ - private static class Tuple implements Serializable { + private static class Message implements Serializable { /** Serial version uid. */ private static final long serialVersionUID = 0L; @@ -309,12 +356,25 @@ private static class Tuple implements Serializable { /** Value. */ private final String val; + /** Multiple values. */ + private final int[] values; + /** * @param key Key. */ - Tuple(int key) { + Message(int key) { this.key = key; this.val = Integer.toString(key); + this.values = new int[0]; + } + + /** + * @param values Multiple values. + */ + Message(int[] values) { + this.key = -1; + this.val = null; + this.values = values; } } } \ No newline at end of file