Skip to content

Commit

Permalink
IGNITE-1370 Streamers: Implement multiple tuple extractor.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Sep 13, 2015
1 parent b736c46 commit 4d9734a
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 27 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.stream; package org.apache.ignite.stream;


import java.util.Map; import java.util.Map;

import org.apache.ignite.Ignite; import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteDataStreamer;


Expand All @@ -26,11 +27,22 @@
* streaming from different data sources. The purpose of adapters is to * streaming from different data sources. The purpose of adapters is to
* convert different message formats into Ignite stream key-value tuples * convert different message formats into Ignite stream key-value tuples
* and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}. * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
* <p>
* Two types of tuple extractors are supported:
* <ol>
* <li>A single tuple extractor, which extracts either no or 1 tuple out of a message. See
* see {@link #setTupleExtractor(StreamTupleExtractor)}.</li>
* <li>A multiple tuple extractor, which is capable of extracting multiple tuples out of a single message, in the
* form of a {@link Map<K, V>}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.</li>
* </ol>
*/ */
public abstract class StreamAdapter<T, K, V> { public abstract class StreamAdapter<T, K, V> {
/** Tuple extractor. */ /** Tuple extractor. */
private StreamTupleExtractor<T, K, V> extractor; private StreamTupleExtractor<T, K, V> extractor;


/** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;

/** Streamer. */ /** Streamer. */
private IgniteDataStreamer<K, V> stmr; private IgniteDataStreamer<K, V> stmr;


Expand Down Expand Up @@ -83,6 +95,20 @@ public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
this.extractor = extractor; this.extractor = extractor;
} }


/**
* @return Provided tuple extractor (for 1:n cardinality).
*/
public StreamMultipleTupleExtractor<T, K, V> getMultipleTupleExtractor() {
return multipleTupleExtractor;
}

/**
* @param multipleTupleExtractor Extractor for 1:n tuple extraction.
*/
public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
this.multipleTupleExtractor = multipleTupleExtractor;
}

/** /**
* @return Provided {@link Ignite} instance. * @return Provided {@link Ignite} instance.
*/ */
Expand All @@ -98,14 +124,28 @@ public void setIgnite(Ignite ignite) {
} }


/** /**
* Converts given message to a tuple and adds it to the underlying streamer. * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
* underlying streamer.
* <p>
* If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
* and the latter will be ignored.
* *
* @param msg Message to convert. * @param msg Message to convert.
*/ */
protected void addMessage(T msg) { protected void addMessage(T msg) {
Map.Entry<K, V> e = extractor.extract(msg); if (multipleTupleExtractor == null) {
Map.Entry<K, V> e = extractor.extract(msg);

if (e != null)
stmr.addData(e);


if (e != null) } else {
stmr.addData(e); Map<K, V> m = multipleTupleExtractor.extract(msg);

if (m != null)
stmr.addData(m);

}
} }

} }
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.stream;

import java.util.Map;

/**
* Stream tuple extractor to convert a single message to zero, one or many tuples.
* <p>
* For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}.
*
* @see StreamTupleExtractor
*/
public interface StreamMultipleTupleExtractor<T, K, V> {

/**
* Extracts a set of key-values from a message.
*
* @param msg Message.
* @return Map containing resulting tuples.
*/
public Map<K, V> extract(T msg);
}
Expand Up @@ -21,6 +21,11 @@


/** /**
* Stream tuple extractor to convert messages to Ignite key-value tuples. * Stream tuple extractor to convert messages to Ignite key-value tuples.
* <p>
* Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
* produce more than one tuple.
*
* @see StreamMultipleTupleExtractor
*/ */
public interface StreamTupleExtractor<T, K, V> { public interface StreamTupleExtractor<T, K, V> {
/** /**
Expand Down
Expand Up @@ -141,7 +141,8 @@ public void setConverter(SocketMessageConverter<T> converter) {
* @throws IgniteException If failed. * @throws IgniteException If failed.
*/ */
public void start() { public void start() {
A.notNull(getTupleExtractor(), "tupleExtractor"); A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
"tupleExtractor (single or multiple)");
A.notNull(getStreamer(), "streamer"); A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite"); A.notNull(getIgnite(), "ignite");
A.ensure(threads > 0, "threads > 0"); A.ensure(threads > 0, "threads > 0");
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
Expand All @@ -43,6 +44,7 @@
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamTupleExtractor; import org.apache.ignite.stream.StreamTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -111,7 +113,7 @@ public void testSizeBasedDefaultConverter() throws Exception {
Marshaller marsh = new JdkMarshaller(); Marshaller marsh = new JdkMarshaller();


for (int i = 0; i < CNT; i++) { for (int i = 0; i < CNT; i++) {
byte[] msg = marsh.marshal(new Tuple(i)); byte[] msg = marsh.marshal(new Message(i));


os.write(msg.length >>> 24); os.write(msg.length >>> 24);
os.write(msg.length >>> 16); os.write(msg.length >>> 16);
Expand All @@ -125,21 +127,52 @@ public void testSizeBasedDefaultConverter() throws Exception {
throw new IgniteException(e); throw new IgniteException(e);
} }
} }
}); }, true);
}

/**
* @throws Exception If failed.
*/
public void testMultipleEntriesFromOneMessage() throws Exception {
test(null, null, new Runnable() {
@Override public void run() {
try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
Marshaller marsh = new JdkMarshaller();

int[] values = new int[CNT];
for (int i = 0; i < CNT; i++) {
values[i] = i;
}

byte[] msg = marsh.marshal(new Message(values));

os.write(msg.length >>> 24);
os.write(msg.length >>> 16);
os.write(msg.length >>> 8);
os.write(msg.length);

os.write(msg);
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}, false);
} }


/** /**
* @throws Exception If failed. * @throws Exception If failed.
*/ */
public void testSizeBasedCustomConverter() throws Exception { public void testSizeBasedCustomConverter() throws Exception {
SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() { SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
@Override public Tuple convert(byte[] msg) { @Override public Message convert(byte[] msg) {
int i = (msg[0] & 0xFF) << 24; int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16; i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8; i |= (msg[2] & 0xFF) << 8;
i |= msg[3] & 0xFF; i |= msg[3] & 0xFF;


return new Tuple(i); return new Message(i);
} }
}; };


Expand All @@ -164,7 +197,7 @@ public void testSizeBasedCustomConverter() throws Exception {
throw new IgniteException(e); throw new IgniteException(e);
} }
} }
}); }, true);
} }


/** /**
Expand All @@ -178,7 +211,7 @@ public void testDelimiterBasedDefaultConverter() throws Exception {
Marshaller marsh = new JdkMarshaller(); Marshaller marsh = new JdkMarshaller();


for (int i = 0; i < CNT; i++) { for (int i = 0; i < CNT; i++) {
byte[] msg = marsh.marshal(new Tuple(i)); byte[] msg = marsh.marshal(new Message(i));


os.write(msg); os.write(msg);
os.write(DELIM); os.write(DELIM);
Expand All @@ -188,22 +221,22 @@ public void testDelimiterBasedDefaultConverter() throws Exception {
throw new IgniteException(e); throw new IgniteException(e);
} }
} }
}); }, true);


} }


/** /**
* @throws Exception If failed. * @throws Exception If failed.
*/ */
public void testDelimiterBasedCustomConverter() throws Exception { public void testDelimiterBasedCustomConverter() throws Exception {
SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() { SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
@Override public Tuple convert(byte[] msg) { @Override public Message convert(byte[] msg) {
int i = (msg[0] & 0xFF) << 24; int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16; i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8; i |= (msg[2] & 0xFF) << 8;
i |= msg[3] & 0xFF; i |= msg[3] & 0xFF;


return new Tuple(i); return new Message(i);
} }
}; };


Expand All @@ -225,16 +258,17 @@ public void testDelimiterBasedCustomConverter() throws Exception {
throw new IgniteException(e); throw new IgniteException(e);
} }
} }
}); }, true);
} }


/** /**
* @param converter Converter. * @param converter Converter.
* @param r Runnable.. * @param r Runnable..
*/ */
private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
boolean oneMessagePerTuple) throws Exception
{ {
SocketStreamer<Tuple, Integer, String> sockStmr = null; SocketStreamer<Message, Integer, String> sockStmr = null;


Ignite ignite = grid(0); Ignite ignite = grid(0);


Expand All @@ -257,11 +291,24 @@ private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable b


sockStmr.setDelimiter(delim); sockStmr.setDelimiter(delim);


sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() { if (oneMessagePerTuple) {
@Override public Map.Entry<Integer, String> extract(Tuple msg) { sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
return new IgniteBiTuple<>(msg.key, msg.val); @Override public Map.Entry<Integer, String> extract(Message msg) {
} return new IgniteBiTuple<>(msg.key, msg.val);
}); }
});
}
else {
sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor<Message, Integer, String>() {
@Override public Map<Integer, String> extract(Message msg) {
Map<Integer, String> answer = new HashMap<>();
for (int value : msg.values) {
answer.put(value, Integer.toString(value));
}
return answer;
}
});
}


if (converter != null) if (converter != null)
sockStmr.setConverter(converter); sockStmr.setConverter(converter);
Expand Down Expand Up @@ -297,9 +344,9 @@ private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable b
} }


/** /**
* Tuple. * Message.
*/ */
private static class Tuple implements Serializable { private static class Message implements Serializable {
/** Serial version uid. */ /** Serial version uid. */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand All @@ -309,12 +356,25 @@ private static class Tuple implements Serializable {
/** Value. */ /** Value. */
private final String val; private final String val;


/** Multiple values. */
private final int[] values;

/** /**
* @param key Key. * @param key Key.
*/ */
Tuple(int key) { Message(int key) {
this.key = key; this.key = key;
this.val = Integer.toString(key); this.val = Integer.toString(key);
this.values = new int[0];
}

/**
* @param values Multiple values.
*/
Message(int[] values) {
this.key = -1;
this.val = null;
this.values = values;
} }
} }
} }

0 comments on commit 4d9734a

Please sign in to comment.