Navigation Menu

Skip to content

Commit

Permalink
Support for IP fragments
Browse files Browse the repository at this point in the history
  • Loading branch information
wnagele committed May 3, 2015
1 parent e4a1046 commit 75143d2
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 20 deletions.
Expand Up @@ -30,7 +30,12 @@ protected Packet createPacket() {
}

@Override
protected boolean isReassemble() {
protected boolean isReassembleDatagram() {
return true;
}

@Override
protected boolean isReassembleTcp() {
return true;
}

Expand All @@ -41,10 +46,15 @@ protected boolean isPush() {

@Override
protected void processPacketPayload(Packet packet, byte[] payload) {
String protocol = (String)packet.get(Packet.PROTOCOL);

if (!PcapReader.PROTOCOL_UDP.equals(protocol) && !PcapReader.PROTOCOL_TCP.equals(protocol))
return;

DnsPacket dnsPacket = (DnsPacket)packet;

if (DNS_PORT == (Integer)packet.get(Packet.SRC_PORT) || DNS_PORT == (Integer)packet.get(Packet.DST_PORT)) {
if (PROTOCOL_TCP.equals(packet.get(Packet.PROTOCOL)) &&
if (PROTOCOL_TCP.equals(protocol) &&
payload.length > 2) // TODO Support DNS responses with multiple messages (as used for XFRs)
payload = Arrays.copyOfRange(payload, 2, payload.length); // First two bytes denote the size of the DNS message, ignore them
try {
Expand Down
Expand Up @@ -19,7 +19,12 @@ protected Packet createPacket() {
}

@Override
protected boolean isReassemble() {
protected boolean isReassembleDatagram() {
return true;
}

@Override
protected boolean isReassembleTcp() {
return true;
}

Expand Down
Expand Up @@ -52,7 +52,12 @@ protected Packet createPacket() {
}

@Override
protected boolean isReassemble() {
protected boolean isReassembleDatagram() {
return true;
}

@Override
protected boolean isReassembleTcp() {
return true;
}

Expand All @@ -63,12 +68,16 @@ protected boolean isPush() {

@Override
protected void processPacketPayload(Packet packet, final byte[] payload) {
String protocol = (String)packet.get(Packet.PROTOCOL);

if (!PcapReader.PROTOCOL_TCP.equals(protocol))
return;

HttpPacket httpPacket = (HttpPacket)packet;
Integer srcPort = (Integer)packet.get(Packet.SRC_PORT);
Integer dstPort = (Integer)packet.get(Packet.DST_PORT);
if ((HTTP_PORT == srcPort || HTTP_PORT == dstPort) &&
packet.containsKey(Packet.REASSEMBLED_FRAGMENTS) &&
PROTOCOL_TCP.equals(packet.get(Packet.PROTOCOL))) {
packet.containsKey(Packet.REASSEMBLED_TCP_FRAGMENTS)) {
final SessionInputBuffer inBuf = new AbstractSessionInputBuffer() {
{
init(new ByteArrayInputStream(payload), 1024, params);
Expand Down
142 changes: 133 additions & 9 deletions hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/PcapReader.java
Expand Up @@ -3,6 +3,7 @@
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.math.BigDecimal;
Expand All @@ -17,6 +18,7 @@
import com.google.common.collect.TreeMultimap;
import com.google.common.primitives.Bytes;

import net.ripe.hadoop.pcap.packet.Datagram;
import net.ripe.hadoop.pcap.packet.Flow;
import net.ripe.hadoop.pcap.packet.Packet;

Expand Down Expand Up @@ -50,6 +52,12 @@ public class PcapReader implements Iterable<Packet> {
public static final int IPV6_SRC_OFFSET = 8; // relative to start of IP header
public static final int IP_DST_OFFSET = 16; // relative to start of IP header
public static final int IPV6_DST_OFFSET = 24; // relative to start of IP header
public static final int IP_ID_OFFSET = 4; // relative to start of IP header
public static final int IPV6_ID_OFFSET = 4; // relative to start of IP header
public static final int IP_FLAGS_OFFSET = 6; // relative to start of IP header
public static final int IPV6_FLAGS_OFFSET = 3; // relative to start of IP header
public static final int IP_FRAGMENT_OFFSET = 6; // relative to start of IP header
public static final int IPV6_FRAGMENT_OFFSET = 2; // relative to start of IP header
public static final int UDP_HEADER_SIZE = 8;
public static final int PROTOCOL_HEADER_SRC_PORT_OFFSET = 0;
public static final int PROTOCOL_HEADER_DST_PORT_OFFSET = 2;
Expand All @@ -59,6 +67,7 @@ public class PcapReader implements Iterable<Packet> {
public static final String PROTOCOL_ICMP = "ICMP";
public static final String PROTOCOL_TCP = "TCP";
public static final String PROTOCOL_UDP = "UDP";
public static final String PROTOCOL_FRAGMENT = "Fragment";

private final DataInputStream is;
private Iterator<Packet> iterator;
Expand All @@ -72,6 +81,7 @@ public class PcapReader implements Iterable<Packet> {
private boolean reverseHeaderByteOrder = false;

private Multimap<Flow, SequencePayload> flows = TreeMultimap.create();
private Multimap<Datagram, DatagramPayload> datagrams = TreeMultimap.create();

public byte[] pcapHeader;
public byte[] pcapPacketHeader;
Expand Down Expand Up @@ -179,25 +189,61 @@ private Packet nextPacket() {

if (ipProtocolHeaderVersion == 4 || ipProtocolHeaderVersion == 6) {
int ipHeaderLen = getInternetProtocolHeaderLength(packetData, ipProtocolHeaderVersion, ipStart);
packet.put(Packet.IP_HEADER_LENGTH, ipHeaderLen);

int totalLength = 0;
if (ipProtocolHeaderVersion == 4) {
buildInternetProtocolV4Packet(packet, packetData, ipStart);
totalLength = PcapReaderUtil.convertShort(packetData, ipStart + IP_TOTAL_LEN_OFFSET);
} else if (ipProtocolHeaderVersion == 6) {
buildInternetProtocolV6Packet(packet, packetData, ipStart);
ipHeaderLen += buildInternetProtocolV6ExtensionHeaderFragment(packet, packetData, ipStart);
int payloadLength = PcapReaderUtil.convertShort(packetData, ipStart + IPV6_PAYLOAD_LEN_OFFSET);
totalLength = payloadLength + IPV6_HEADER_SIZE;
}
packet.put(Packet.IP_HEADER_LENGTH, ipHeaderLen);

if ((Boolean)packet.get(Packet.FRAGMENT)) {
if (isReassembleDatagram()) {
Datagram datagram = packet.getDatagram();
Long fragmentOffset = (Long)packet.get(Packet.FRAGMENT_OFFSET);
byte[] fragmentPacketData = Arrays.copyOfRange(packetData, ipStart + ipHeaderLen, ipStart + totalLength);
DatagramPayload payload = new DatagramPayload(fragmentOffset, fragmentPacketData);
datagrams.put(datagram, payload);

if ((Boolean)packet.get(Packet.LAST_FRAGMENT)) {
Collection<DatagramPayload> datagramPayloads = datagrams.removeAll(datagram);
if (datagramPayloads != null && datagramPayloads.size() > 0) {
packet.put(Packet.REASSEMBLED_DATAGRAM_FRAGMENTS, datagramPayloads.size());
packetData = Arrays.copyOfRange(packetData, 0, ipStart + ipHeaderLen); // Start re-fragmented packet with header from current packet
totalLength = ipHeaderLen;
DatagramPayload prev = null;
for (DatagramPayload datagramPayload : datagramPayloads) {
if (prev != null && !datagramPayload.linked(prev)) {
LOG.warn("Broken datagram chain between " + datagramPayload + " and " + prev + ". Returning empty payload.");
packetData = new byte[0];
break;
}
packetData = Bytes.concat(packetData, datagramPayload.payload);
totalLength += datagramPayload.payload.length;
prev = datagramPayload;
}
}
} else {
packet.put(Packet.PROTOCOL, PROTOCOL_FRAGMENT);
}
} else {
packet.put(Packet.PROTOCOL, PROTOCOL_FRAGMENT);
}
}

String protocol = (String)packet.get(Packet.PROTOCOL);
int payloadDataStart = ipStart + ipHeaderLen;
int payloadLength = totalLength - ipHeaderLen;
byte[] packetPayload = readPayload(packetData, payloadDataStart, payloadLength);
if (PROTOCOL_UDP == protocol ||
PROTOCOL_TCP == protocol) {

byte[] packetPayload = buildTcpAndUdpPacket(packet, packetData, ipProtocolHeaderVersion, ipStart, ipHeaderLen, totalLength);
packetPayload = buildTcpAndUdpPacket(packet, packetData, ipProtocolHeaderVersion, ipStart, ipHeaderLen, totalLength);

if (isReassemble() && PROTOCOL_TCP == protocol) {
if (isReassembleTcp() && PROTOCOL_TCP == protocol) {
Flow flow = packet.getFlow();

if (packetPayload.length > 0) {
Expand All @@ -209,7 +255,7 @@ private Packet nextPacket() {
if ((Boolean)packet.get(Packet.TCP_FLAG_FIN) || (isPush() && (Boolean)packet.get(Packet.TCP_FLAG_PSH))) {
Collection<SequencePayload> fragments = flows.removeAll(flow);
if (fragments != null && fragments.size() > 0) {
packet.put(Packet.REASSEMBLED_FRAGMENTS, fragments.size());
packet.put(Packet.REASSEMBLED_TCP_FRAGMENTS, fragments.size());
packetPayload = new byte[0];
SequencePayload prev = null;
for (SequencePayload seqPayload : fragments) {
Expand All @@ -224,8 +270,9 @@ private Packet nextPacket() {
}
}
}
processPacketPayload(packet, packetPayload);
}
packet.put(Packet.LEN, packetPayload.length);
processPacketPayload(packet, packetPayload);
}

return packet;
Expand All @@ -235,7 +282,11 @@ protected Packet createPacket() {
return new Packet();
}

protected boolean isReassemble() {
protected boolean isReassembleDatagram() {
return false;
}

protected boolean isReassembleTcp() {
return false;
}

Expand Down Expand Up @@ -322,6 +373,23 @@ private int getTcpHeaderLength(byte[] packet, int tcpStart) {
}

private void buildInternetProtocolV4Packet(Packet packet, byte[] packetData, int ipStart) {
long id = new Long(PcapReaderUtil.convertShort(packetData, ipStart + IP_ID_OFFSET));
packet.put(Packet.ID, id);

int flags = packetData[ipStart + IP_FLAGS_OFFSET] & 0xE0;
packet.put(Packet.IP_FLAGS_DF, (flags & 0x40) == 0 ? false : true);
packet.put(Packet.IP_FLAGS_MF, (flags & 0x20) == 0 ? false : true);

long fragmentOffset = (PcapReaderUtil.convertShort(packetData, ipStart + IP_FRAGMENT_OFFSET) & 0x1FFF) * 8;
packet.put(Packet.FRAGMENT_OFFSET, fragmentOffset);

if ((flags & 0x20) != 0 || fragmentOffset != 0) {
packet.put(Packet.FRAGMENT, true);
packet.put(Packet.LAST_FRAGMENT, ((flags & 0x20) == 0 && fragmentOffset != 0));
} else {
packet.put(Packet.FRAGMENT, false);
}

int ttl = packetData[ipStart + IP_TTL_OFFSET] & 0xFF;
packet.put(Packet.TTL, ttl);

Expand Down Expand Up @@ -349,6 +417,31 @@ private void buildInternetProtocolV6Packet(Packet packet, byte[] packetData, int
packet.put(Packet.DST, dst);
}

private int buildInternetProtocolV6ExtensionHeaderFragment(Packet packet, byte[] packetData, int ipStart) {
if (PROTOCOL_FRAGMENT.equals((String)packet.get(Packet.PROTOCOL))) {
long id = PcapReaderUtil.convertUnsignedInt(packetData, ipStart + IPV6_HEADER_SIZE + IPV6_ID_OFFSET);
packet.put(Packet.ID, id);

int flags = packetData[ipStart + IPV6_HEADER_SIZE + IPV6_FLAGS_OFFSET] & 0x7;
packet.put(Packet.IPV6_FLAGS_M, (flags & 0x1) == 0 ? false : true);

long fragmentOffset = PcapReaderUtil.convertShort(packetData, ipStart + IPV6_HEADER_SIZE + IPV6_FRAGMENT_OFFSET) & 0xFFF8;
packet.put(Packet.FRAGMENT_OFFSET, fragmentOffset);

packet.put(Packet.FRAGMENT, true);
packet.put(Packet.LAST_FRAGMENT, ((flags & 0x1) == 0 && fragmentOffset != 0));

int protocol = packetData[ipStart + IPV6_HEADER_SIZE];
packet.put(Packet.PROTOCOL, PcapReaderUtil.convertProtocolIdentifier(protocol)); // Change protocol to value from fragment header

return 8; // Return fragment header extension length
}

// Not a fragment
packet.put(Packet.FRAGMENT, false);
return 0;
}

/*
* packetData is the entire layer 2 packet read from pcap
* ipStart is the start of the IP packet in packetData
Expand Down Expand Up @@ -399,7 +492,6 @@ private byte[] buildTcpAndUdpPacket(Packet packet, byte[] packetData, int ipProt
int payloadDataStart = ipStart + ipHeaderLen + tcpOrUdpHeaderSize;
int payloadLength = totalLength - ipHeaderLen - tcpOrUdpHeaderSize;
byte[] data = readPayload(packetData, payloadDataStart, payloadLength);
packet.put(Packet.LEN, data.length);
return data;
}

Expand Down Expand Up @@ -516,4 +608,36 @@ public String toString() {
.toString();
}
}

private class DatagramPayload implements Comparable<DatagramPayload> {
private Long offset;
private byte[] payload;

public DatagramPayload(Long offset, byte[] payload) {
this.offset = offset;
this.payload = payload;
}

@Override
public int compareTo(DatagramPayload o) {
return ComparisonChain.start().compare(offset, o.offset)
.compare(payload.length, o.payload.length)
.result();
}

public boolean linked(DatagramPayload o) {
if ((offset + payload.length) == o.offset)
return true;
if ((o.offset + o.payload.length) == offset)
return true;
return false;
}

@Override
public String toString() {
return Objects.toStringHelper(this.getClass()).add("offset", offset)
.add("len", payload.length)
.toString();
}
}
}
Expand Up @@ -14,6 +14,7 @@ public class PcapReaderUtil {
protocols.put(1, PcapReader.PROTOCOL_ICMP);
protocols.put(6, PcapReader.PROTOCOL_TCP);
protocols.put(17, PcapReader.PROTOCOL_UDP);
protocols.put(44, PcapReader.PROTOCOL_FRAGMENT); // Using IPv4 fragment protocol number across protocols (see http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml)
}

public static long convertInt(byte[] data) {
Expand Down
@@ -0,0 +1,37 @@
package net.ripe.hadoop.pcap.packet;

import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;

public class Datagram implements Comparable<Datagram> {
private String src;
private String dst;
private Long id;
private String protocol;

public Datagram(String src, String dst, Long id, String protocol) {
this.src = src;
this.dst = dst;
this.id = id;
this.protocol = protocol;
}

@Override
public int compareTo(Datagram o) {
return ComparisonChain.start()
.compare(src, o.src)
.compare(dst, o.dst)
.compare(id, o.id)
.compare(protocol, o.protocol)
.result();
}

@Override
public String toString() {
return Objects.toStringHelper(this.getClass()).add("src", src)
.add("dst", dst)
.add("id", id)
.add("protocol", protocol)
.toString();
}
}

0 comments on commit 75143d2

Please sign in to comment.