Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' into debian

  • Loading branch information...
commit fb047703e54ac4a8bc0e09e03b00bf373adb389b 2 parents 3381b01 + 364925f
@eevans eevans authored
View
10 build.xml
@@ -24,6 +24,7 @@
<path id="java.classpath">
<fileset dir="${lib.dir}">
<include name="*.jar" />
+ <exclude name="junit*.jar"/>
</fileset>
</path>
@@ -70,7 +71,10 @@
<copy todir="${dist.dir}" file="README.textile" />
<copy todir="${dist.dir}" file="flewton.cfg" />
<copy todir="${dist.dir}/lib" flatten="true">
- <fileset dir="${basedir}/lib/" excludes="sources/" />
+ <fileset dir="${basedir}/lib/">
+ <exclude name="sources/"/>
+ <exclude name="junit*.jar"/>
+ </fileset>
</copy>
</target>
@@ -98,6 +102,10 @@
<delete dir="${build.dir}" />
<delete dir="${dist.dir}" />
</target>
+
+ <target name="scrub-lib">
+ <delete dir="${basedir}/lib/cachedir"/>
+ </target>
</project>
<!--
View
BIN  lib/uuid-3.2.jar
Binary file not shown
View
23 src/com/rackspace/flewton/AbstractRecord.java
@@ -35,9 +35,28 @@
import org.jboss.netty.buffer.ChannelBuffer;
public abstract class AbstractRecord {
+ private static final String FLOWS_TAG = "flows";
+
public List<Flow> flows = new ArrayList<Flow>();
-
+
public AbstractRecord(ChannelBuffer buffer) {
-
+ }
+
+ /**
+ * Serialize record to an XML string.
+ *
+ * @return XML representation of record.
+ */
+ public String toXmlString() {
+ StringBuilder out = new StringBuilder();
+ out.append('<').append(FLOWS_TAG).append('>');
+
+ for (Flow flow : flows) {
+ out.append(flow.toXmlString());
+ }
+
+ out.append("</").append(FLOWS_TAG).append('>');
+
+ return out.toString();
}
}
View
7 src/com/rackspace/flewton/CollectorHandler.java
@@ -73,6 +73,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
logger.warn(String.format("Netflow v%d is not supported", version));
if (logger.isDebugEnabled())
logPacket(e.getRemoteAddress(), buff.duplicate());
+ return;
} else
throw new UnsupportedOperationException(
String.format("Netflow v%d is not supported", version));
@@ -83,9 +84,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
}
// Send record to backends
- if (record != null)
- for (IBackend backend : backEnds)
- backend.write(record);
+ assert record != null;
+ for (IBackend backend : backEnds)
+ backend.write(record);
}
// dumps the contents of a buffer
View
39 src/com/rackspace/flewton/CollectorServer.java
@@ -68,6 +68,7 @@
}
private int remotePort;
+ private String listenAddress;
private static List<IBackend> createBackends(String[] backendNames, HierarchicalINIConfiguration config) throws ConfigError {
List<IBackend> backends = new ArrayList<IBackend>();
@@ -171,23 +172,32 @@ public void init(String[] arguments) throws ConfigurationException, ConfigError
public void start() {
DatagramChannelFactory f = new NioDatagramChannelFactory(Executors.newCachedThreadPool());
ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(f);
-
+
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new CollectorHandler());
}
});
-
+
// Disable broadcast
bootstrap.setOption("broadcast", false);
-
+
// Allow packets as large as 2048 bytes (default is 768)
- bootstrap.setOption("receiveBufferSizePredictorFactory",
- new FixedReceiveBufferSizePredictorFactory(2048));
+ bootstrap.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(2048));
+
+ InetSocketAddress sockAddress = null;
- logger.info("Binding to UDP 0.0.0.0:{}", remotePort);
- bootstrap.bind(new InetSocketAddress(remotePort));
+ if (getListenAddress() == null) {
+ logger.info("Binding to UDP 0.0.0.0:{}", getListenPort());
+ sockAddress = new InetSocketAddress(getListenPort());
+ }
+ else {
+ logger.info("Binding to UDP {}:{}", getListenAddress(), getListenPort());
+ sockAddress = new InetSocketAddress(getListenAddress(), getListenPort());
+ }
+
+ bootstrap.bind(sockAddress);
}
/* jsvc */
@@ -200,4 +210,19 @@ public void destroy() {
}
+ public int getListenPort() {
+ return remotePort;
+ }
+
+ public void setListenPort(int remotePort) {
+ this.remotePort = remotePort;
+ }
+
+ public String getListenAddress() {
+ return listenAddress;
+ }
+
+ public void setListenAddress(String listenAddress) {
+ this.listenAddress = listenAddress;
+ }
}
View
53 src/com/rackspace/flewton/Flow.java
@@ -31,6 +31,11 @@
import java.net.InetAddress;
public class Flow {
+ private static final String FLOW_TAG = "flow";
+ private static final String ATTR_TAG = "attribute";
+ private static final String NAME_TAG = "name";
+ private static final String VALUE_TAG = "value";
+
public InetAddress sourceAddr;
public InetAddress destAddr;
public InetAddress nextHop;
@@ -47,6 +52,52 @@
public byte tos;
public short sourceAS;
public short destAS;
-
+
public long timestampCalculated;
+
+ private static String wrapAttribute(String name, Object value) {
+ return String.format(
+ "<%s><%s>%s</%s><%s>%s</%s></%s>",
+ ATTR_TAG,
+ NAME_TAG,
+ name,
+ NAME_TAG,
+ VALUE_TAG,
+ value,
+ VALUE_TAG,
+ ATTR_TAG);
+ }
+
+ /**
+ * Serialize flow to XML string.
+ *
+ * @return XML representation of flow.
+ */
+ public String toXmlString() {
+ StringBuilder out = new StringBuilder();
+
+ out.append('<').append(FLOW_TAG).append('>');
+
+ out.append(wrapAttribute("sourceAddr", sourceAddr.getHostAddress()));
+ out.append(wrapAttribute("destAddr", destAddr.getHostAddress()));
+ out.append(wrapAttribute("nextHop", nextHop.getHostAddress()));
+ out.append(wrapAttribute("snmpIn", snmpIn));
+ out.append(wrapAttribute("snmpOut", snmpOut));
+ out.append(wrapAttribute("numPackets", numPackets));
+ out.append(wrapAttribute("numOctets", numOctets));
+ out.append(wrapAttribute("timeFirst", timeFirst));
+ out.append(wrapAttribute("timeLast", timeLast));
+ out.append(wrapAttribute("sourcePort", sourcePort));
+ out.append(wrapAttribute("destPort", destPort));
+ out.append(wrapAttribute("tcpFlags", tcpFlags));
+ out.append(wrapAttribute("protocol", protocol));
+ out.append(wrapAttribute("tos", tos));
+ out.append(wrapAttribute("sourceAS", sourceAS));
+ out.append(wrapAttribute("destAS", destAS));
+ out.append(wrapAttribute("timestampCalculated", timestampCalculated));
+
+ out.append("</").append(FLOW_TAG).append('>');
+
+ return out.toString();
+ }
}
View
27 src/com/rackspace/flewton/backend/cassandra/UsageBackend.java
@@ -35,8 +35,7 @@
import com.rackspace.flewton.util.HostResolver;
import static com.rackspace.flewton.util.HostResolver.int2ByteBuffer;
-import static com.eaio.uuid.UUIDGen.createTime;
-import static com.eaio.uuid.UUIDGen.getClockSeqAndNode;
+import com.rackspace.flewton.util.UUIDGen;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
@@ -96,6 +95,7 @@ public UsageBackend(HierarchicalConfiguration config) throws ConfigError {
public void write(AbstractRecord record) {
final long ts = System.currentTimeMillis();
final Map<ByteBuffer, Map<String, List<Mutation>>> mutations = makeMutations(record, resolver, ts);
+ if (mutations.size() == 0) return; // nothing to do.
long retry = 0; // indicates how many MS to sleep before retrying.
// first try.
@@ -212,31 +212,10 @@ protected void write(Map<ByteBuffer, Map<String, List<Mutation>>> mutations)
* of a type 1 UUID (a time-based UUID).
*
* @param timeMillis
- * @return a type 1 UUID represented as a byte[]
- */
- protected static byte[] getTimeUUIDBytes(long timeMillis) {
- long msb = createTime(timeMillis), lsb = getClockSeqAndNode();
- byte[] uuidBytes = new byte[16];
-
- for (int i = 0; i < 8; i++) {
- uuidBytes[i] = (byte) (msb >>> 8 * (7 - i));
- }
- for (int i = 8; i < 16; i++) {
- uuidBytes[i] = (byte) (lsb >>> 8 * (7 - i));
- }
-
- return uuidBytes;
- }
-
- /**
- * Converts a milliseconds-since-epoch timestamp into the 16 byte representation
- * of a type 1 UUID (a time-based UUID).
- *
- * @param timeMillis
* @return a type 1 UUID represented as a ByteBuffer
*/
protected static ByteBuffer getTimeUUIDByteBuffer(long timeMillis) {
- return ByteBuffer.wrap(getTimeUUIDBytes(timeMillis));
+ return ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(timeMillis));
}
private Cassandra.Client borrowClient() {
View
74 src/com/rackspace/flewton/util/UUIDGen.java
@@ -36,8 +36,6 @@
/**
* The goods are here: www.ietf.org/rfc/rfc4122.txt.
- * NOTE: One instance of UUIDGen on a MacbookPro (2010) can generate 10k unique uuids per millisecond. If you need more,
- * use another instance of UUIDGen, which is guaranteed to return a different time given the same ms timestamp.
*/
public class UUIDGen
{
@@ -46,16 +44,41 @@
private static final long CLOCK = new Random(System.currentTimeMillis()).nextLong();
private static int clockOffsetTicker = 0;
+ // placement of this singleton is important. It needs to be instantiated *AFTER* the other statics.
+ private static final UUIDGen instance = new UUIDGen();
+
private final long clock;
- private long lastMs = 0;
- private long lastNano = 0;
+ private long lastNanos;
- public UUIDGen() {
+ private UUIDGen() {
+ // make sure someone didn't whack the clock by changing the order of instantiation.
+ if (CLOCK == 0) throw new RuntimeException("singleton instantiation is misplaced.");
+
clock = CLOCK + clockOffsetTicker++;
- lastNano = System.nanoTime() / 100;
}
- public long getClockSeqAndNode() {
+ /**
+ * Converts a milliseconds-since-epoch timestamp into the 16 byte representation
+ * of a type 1 UUID (a time-based UUID).
+ *
+ * @param timeMillis
+ * @return a type 1 UUID represented as a byte[]
+ */
+ public static byte[] getTimeUUIDBytes(long timeMillis) {
+ long msb = instance.createTime(timeMillis), lsb = instance.getClockSeqAndNode();
+ byte[] uuidBytes = new byte[16];
+
+ for (int i = 0; i < 8; i++) {
+ uuidBytes[i] = (byte) (msb >>> 8 * (7 - i));
+ }
+ for (int i = 8; i < 16; i++) {
+ uuidBytes[i] = (byte) (lsb >>> 8 * (7 - i));
+ }
+ return uuidBytes;
+ }
+
+ // todo: could cache value if we assume node doesn't change.
+ private synchronized long getClockSeqAndNode() {
long lsb = 0;
lsb |= (clock & 0x3f00000000000000L) >>> 56; // was 58?
lsb |= 0x0000000000000080;
@@ -64,23 +87,24 @@ public long getClockSeqAndNode() {
return lsb;
}
- public long createTime(long when) {
+ // needs to return two different values for the same when.
+ // we can generate at most 10k UUIDs per ms.
+ private synchronized long createTime(long when) {
long nanosSince = (when - START_EPOCH) * 10000;
- long nanos = System.nanoTime() / 100;
- // this trick breaks down if we try to ask for more than 9999 uuids per ms.
- if (lastMs == System.currentTimeMillis())
- nanosSince += nanos-lastNano;
+ if (nanosSince > lastNanos)
+ lastNanos = nanosSince;
else
- lastNano = nanos;
+ nanosSince = ++lastNanos;
+
long msb = 0L;
msb |= (0x00000000ffffffffL & nanosSince) << 32;
msb |= (0x0000ffff00000000L & nanosSince) >>> 16;
msb |= (0xffff000000000000L & nanosSince) >>> 48;
- msb &= 0xffffffffffff1fffL; // sets the version to 1.
- lastMs = when;
+ msb |= 0x0000000000001000L; // sets the version to 1.
return msb;
}
+ // todo: could exploit caching.
private static long makeNode() {
// ideally, we'd use the MAC address, but java doesn't expose that.
try {
@@ -91,31 +115,13 @@ private static long makeNode() {
for (int i = 0; i < Math.min(6, hash.length); i++)
node |= (0x00000000000000ff & (long)hash[i]) << (5-i)*8;
assert (0xff00000000000000L & node) == 0;
+ return node;
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
} catch (NoSuchAlgorithmException ex) {
throw new RuntimeException(ex);
}
- return 0;
- }
-
-
- public static void main(String args[]) {
- UUIDGen gen = new UUIDGen();
- int count = 1000;
- long[] times = new long[count];
- long start = System.currentTimeMillis();
- for (int i = 0; i < count; i++)
- times[i] = gen.createTime(System.currentTimeMillis());
- long stop = System.currentTimeMillis();
- for (int i = 1; i < count; i++)
- System.out.println(times[i]-times[i-1]);
- System.out.println(String.format("Generated in %d ms", stop-start));
-
-// for (int i = 0; i < 1000000; i++)
-// System.out.println(String.format("%d %d", System.currentTimeMillis(), System.nanoTime()));
}
-
}
// for the curious, here is how I generated START_EPOCH
View
95 test/com/rackspace/flewton/util/UUIDTests.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2010 Rackspace
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the University nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+package com.rackspace.flewton.util;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class UUIDTests {
+
+ @Test
+ public void testUUIDs() {
+ List<UUID> ids = new ArrayList<UUID>();
+
+ long now = 1292510156570L;
+
+ // make sure that each UUID is unique and greater than the last one generated.
+ UUID last = null;
+ for (int i = 0; i < 9999; i++) {
+ ByteBuffer bb = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(now));
+ UUID uuid = new UUID(bb.getLong(), bb.getLong());
+ assert uuid.version() == 1;
+ assert !ids.contains(uuid);
+ ids.add(uuid);
+ if (last != null)
+ assert uuid.compareTo(last) > 0;
+ last = uuid;
+ }
+
+ // even when times goes forward.
+ now++;
+ for (int i = 0; i < 9999; i++) {
+ ByteBuffer bb = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(now));
+ UUID uuid = new UUID(bb.getLong(), bb.getLong());
+ assert uuid.version() == 1;
+ assert !ids.contains(uuid);
+ ids.add(uuid);
+ assert uuid.compareTo(last) > 0;
+ last = uuid;
+ }
+
+ // even when time goes back on itself.
+ now--;
+ for (int i = 0; i < 9999; i++) {
+ ByteBuffer bb = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(now));
+ UUID uuid = new UUID(bb.getLong(), bb.getLong());
+ assert uuid.version() == 1;
+ assert !ids.contains(uuid);
+ ids.add(uuid);
+ assert uuid.compareTo(last) > 0;
+ last = uuid;
+ }
+
+ // even when time goes really backwards.
+ now--;
+ for (int i = 0; i < 9999; i++) {
+ ByteBuffer bb = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(now));
+ UUID uuid = new UUID(bb.getLong(), bb.getLong());
+ assert uuid.version() == 1;
+ assert !ids.contains(uuid);
+ ids.add(uuid);
+ assert uuid.compareTo(last) > 0;
+ last = uuid;
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.