In [11]:
import scapy.all
import struct
import socket

class Address:
    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
    def __hash__(self):
        return hash((self.ip, self.port))
    def __lt__(self, rhs):
        return self.ip < rhs.ip or (self.ip == rhs.ip and self.port < rhs.port)
    def __eq__(self, rhs):
        return not (self < rhs or rhs < self)
    def __str__(self):
        return f"{self.ip:>15}:{self.port:<5}"

class Flow:
    def __init__(self, pkt):
        self.src = Address(pkt['IP'].src, pkt['IP'].sport)
        self.dst = Address(pkt['IP'].dst, pkt['IP'].dport)
    def __hash__(self):
        return hash(self.ordered)
    def __eq__(self, rhs):
        return self.ordered == rhs.ordered
    def __lt__(self, rhs):
        return (
            self.src[0] < rhs.src[0] or
            (self.src[0] == rhs.src[0] and self.src[1] < rhs.src[1]) or
            (self.src == rhs.src and
                (self.dst[0] < rhs.dst[0] or
                (self.dst[0] == rhs.dst[0] and self.dst[1] < rhs.dst[1])))
        )
    def __str__(self):
        return f"{self.src} ==> {self.dst}"
    @property
    def ordered(self):
        return tuple(sorted([self.src, self.dst]))


In [8]:
FLAGS = {
	"F":0x1, "S":0x2, "R":0x4, "P":0x8, "A":0x10, "U":0x20, "E":0x40, "C":0x80,
#	0x1:"F", 0x2:"S", 0x4:"R", 0x8:"P", 0x10:"A", 0x20:"U", 0x40:"E", 0x80:"C"
}

FLAGS_ORDER = "SAFCR"

CLIENT_STATES = {
    'CLOSED': {
        'S': 'SYN_SENT',
    },
    'SYN_SENT': {
        'SA': 'ESTABLISHED',
        # deal with timeout to go back to CLOSED
    },
    'ESTABLISHED': {
        'F': 'FIN_WAIT_1', # (FIN comes from client)
    },
    'FIN_WAIT_1' : {
        'A': 'FIN_WAIT_2', # passive close, client sends nothing
        'F': 'CLOSING', # simultaneous close, client sends ACK
        'FA': 'TIME_WAIT', # FIN and ACK in one packet, same as simultaneous close, client sends ACK
    },
    'CLOSING': {
        'A': 'TIME_WAIT', # client sends nothing
    },
    'FIN_WAIT_2': {
        'F': 'TIME_WAIT', # could wait for timeout, but all transitions to this state lead to CLOSED
    },
}

SERVER_STATES = {
    'LISTEN': {
        'S' : 'SYN_RCVD', # SYN
    },
    'SYN_RCVD': {
        'A': 'ESTABLISHED', # ACK
        'R': 'LISTEN', # RESET
        'F': 'FIN_WAIT_1' # active close
    },
    'ESTABLISHED': {
        'F': 'CLOSE_WAIT', # server sends ACK
    },
    'CLOSE_WAIT': {
        'F': 'LAST_ACK', # (FIN comes from server) server sends FIN
    },
    'LAST_ACK': {
        'A': 'CLOSED',
    }
}


In [26]:
is_syn_pkt = lambda pkt: 'TCP' in pkt and pkt['TCP'].flags == FLAGS['S']

class TCPSession:
	def __init__(self, pkt):
		if not 'TCP' in pkt:
			raise Exception("Not a TCP Packet")
		if not is_syn_pkt(pkt):
			raise Exception("Not valid SYN")

		self.flow = Flow(pkt) # client => server
		self.packets = []

		# 0 is now, 1 is the future Flags
		self.server_state = "LISTEN"
		self.client_state = "CLOSED"

		self.handle_packet(pkt)
#       self.server_close_time = -1.0
#       self.client_close_time = -1.0
#       self.fin_wait_time = -1.0

	@property
	def client(self):
		return self.flow.src
	@property
	def server(self):
		return self.flow.dst

	def add(self, pkt):
		if not 'TCP' in pkt:
			raise Exception("Not a TCP Packet")

		# determine in what context we are handling this packet
#        flow = Flow(pkt)
#        if flow.forward != self.flow and flow.reverse != self.flow:
#            raise Exception("Not a valid packet for this model")

		self.handle_packet(pkt)
#		if flow.dst == self.server:
#			v =  self.add_client_pkt(pkt)
#			if self.is_fin_wait():
#				self.fin_wait_time = pkt.time
#			return v
#		else:
#			v = self.add_server_pkt(pkt)
#			if self.is_fin_wait():
#				self.fin_wait_time = pkt.time
#			return v
#		raise Exception("Not a valid packet for this model")

	def handle_packet(self, pkt):
#		flags = pkt['TCP'].flags
#		flags = ''.join(f for f,x in FLAGS.items() if x & flags)
#		flags = ''.join(f for f in FLAGS_ORDER if f in flags)

		flow = Flow(pkt)

		if flow.dst == self.server:
			state = self.handle_client_packet(pkt)
		else:
			state = self.handle_server_packet(pkt)

		if self.is_fin_wait():
			self.fin_wait_time = pkt.time

		self.packets.append(pkt)

		return state
#		self.client_state = CLIENT_STATES[self.client_state].get(flags) or self.client_state
#		self.server_state = SERVER_STATES[self.server_state].get(flags) or self.server_state
		

	def active_close(self):
		return (self.client_state == self.server_state and self.server_state == "CLOSED")

	def passive_close(self):
		return (self.client_state == "LAST_ACK" and self.server_state == "CLOSE_WAIT")

	def is_closed(self):
		return self.passive_close() or self.active_close()

	def is_fin_wait(self):
		return self.client_state.find("FIN_WAIT") > -1 or self.server_state.find("FIN_WAIT") > -1

	def build_flags(self, sflags):
		return sum([FLAGS[i] for i in sflags])

	def handle_client_packet(self, pkt):
		flags = pkt['TCP'].flags
		client_got_closed = False
		server_got_closed = False

		if flags == self.build_flags("R"):
			self.client_state = "CLOSED"
			self.server_state = "CLOSED"
			server_got_closed = True
			client_got_closed = True

		elif flags == self.build_flags("RA"):
			self.client_state = "CLOSED"
			self.server_state = "CLOSED"
			server_got_closed = True
			client_got_closed = True
		elif flags == self.build_flags("S"):
			self.server_state = "SYN_SENT"

		elif self.client_state == "SYN_SENT":
			if flags & self.build_flags("A") > 0:
				self.client_state = "ESTABLISHED"
				self.server_state = "ESTABLISHED"
			else:
				self.client_state = "CLOSED"
				server_got_closed = pkt.time
				client_got_closed = pkt.time
				return self.is_closed()

		elif self.client_state == "SYN_SENT":
			if flags & self.build_flags("SA") > 0:
				self.client_state = "SYN_RCVD"

		elif self.client_state == "SYN_RECVD" and\
				flags & self.build_flags("F") > 0:
				self.client_state = "FIN_WAIT_1"

		elif self.client_state == "ESTABLISHED" and\
			flags == self.build_flags("FA"):
			self.client_state = "FIN_WAIT_1"

		elif self.client_state == "FIN_WAIT_1" and\
			flags == self.build_flags("A"):
			self.client_state = "CLOSED"

		elif self.client_state == "ESTABLISHED" and\
			self.server_state == "CLOSE_WAIT" and\
			flags & self.build_flags("A") > 0:
			self.client_state = "CLOSED"

		if self.server_state == "FIN_WAIT_1" and\
			self.client_state == "CLOSED" and\
			flags == self.build_flags("A"):
			self.server_state = "CLOSED"
			server_got_closed = True
			client_got_closed = True

		if client_got_closed:
			self.client_close_timed = pkt.time
		if server_got_closed:
			self.server_close_timed = pkt.time

		return self.is_closed()

	def handle_server_packet(self, pkt):
		flags = pkt['TCP'].flags
		server_got_closed = False
		client_got_closed = False

		if flags == self.build_flags("R"):
			self.client_state = "CLOSED"
			self.server_state = "CLOSED"
			server_got_closed = True
			client_got_closed = True

		elif flags == self.build_flags("RA"):
			self.client_state = "CLOSED"
			self.server_state = "CLOSED"
			server_got_closed = True
			client_got_closed = True

		elif flags == self.build_flags("S"):
			self.server_state = "SYN_SENT"
		elif self.server_state == "LISTEN" and\
			flags == self.build_flags("SA"):
			self.server_state = "SYN_RCVD"

		elif self.server_state == "ESTABLISHED" and\
			flags == self.build_flags("FA"):
			self.server_state = "FIN_WAIT_1"

		elif self.server_state == "FIN_WAIT_1" and\
			flags == self.build_flags("A"):
			self.server_state = "CLOSED"
			server_got_closed = True


		elif self.server_state == "SYN_RCVD" and\
			flags == self.build_flags("F"):
			self.server_state = "FIN_WAIT_1"

		elif self.server_state == "FIN_WAIT_1" and\
			flags == self.build_flags("FA"):
			self.server_state = "CLOSED"

		elif self.server_state == "SYN_RCVD" and\
			flags == self.build_flags("A"):
			self.server_state = "ESTABLISHED"

		elif self.server_state == "ESTABLISHED" and\
			flags & self.build_flags("F") > 0:
			self.server_state = "CLOSE_WAIT"

		elif self.client_state == "FIN_WAIT_1" and\
			flags == self.build_flags("FA"):
			self.server_state = "CLOSED"
			server_got_closed = True

		elif self.client_state == "CLOSED" and\
			flags == self.build_flags("A"):
			self.server_state = "CLOSED"
			server_got_closed = True

		if self.client_state == "FIN_WAIT_1" and\
			self.server_state == "CLOSED" and\
			flags == self.build_flags("A"):
			self.client_state = "CLOSED"
			client_got_closed = True

		if client_got_closed:
			self.client_close_timed = pkt.time
		if server_got_closed:
			self.server_close_timed = pkt.time

		return self.is_closed()

In [34]:

#reader = scapy.all.rdpcap('../data/pcap/eq1.pcap', count=100000)
reader = scapy.all.PcapReader('../data/pcap/eq1.pcap')
sessions = {}
closed = []

for pkt in reader:
    if 'IP' not in pkt or 'TCP' not in pkt:
        continue

    flow = Flow(pkt)

    if flow not in sessions and is_syn_pkt(pkt):
        sessions[flow] = TCPSession(pkt)
    elif flow in sessions:
        sessions[flow].add(pkt)

        if sessions[flow].is_closed():
            closed.append(sessions[flow])
            del sessions[flow]

for session in closed:
    print(session.flow, len(session.packets))

  220.223.79.67:59497 ==>  66.220.170.137:443   2
 73.111.224.253:59567 ==>      3.249.2.79:4244  3
   39.111.20.37:34075 ==>    1.145.234.88:443   2
 49.117.208.213:25322 ==>   43.206.171.27:80    3
  212.83.159.46:56505 ==>    1.96.152.232:443   2
  73.240.191.11:43835 ==>   1.145.238.113:443   2
 39.181.220.130:42985 ==>   1.145.229.105:8253  3
 70.163.200.205:39325 ==>    5.207.250.70:80    7
 100.164.41.164:63177 ==>     43.43.40.72:443   6
   28.175.9.102:20943 ==>    1.144.109.80:443   2
   69.15.151.81:30029 ==>   43.206.171.27:80    2
  73.216.69.112:62416 ==>  35.240.203.240:443   9
   212.83.159.7:1919  ==>    1.96.186.235:443   2
  206.145.90.69:53518 ==>   3.238.132.174:443   2
  143.18.38.128:50765 ==>  35.240.203.213:80    2
  99.57.187.115:58845 ==>    1.34.226.232:843   3
   150.2.159.46:63403 ==>  198.79.237.165:80    13
 70.191.125.113:49471 ==>    5.207.250.70:80    7
 130.60.111.195:63344 ==>   1.107.170.192:443   2
 113.158.204.72:59812 ==>   1.107.162.194:443   2

In [53]:
import glob

closed = []

#for fn in glob.glob('../data/pcap/sessions_caida_small/*.pcap'):
for fn in glob.glob('test/*.pcap'):
	session = None

	for pkt in scapy.all.rdpcap(fn):
		flow = Flow(pkt)
		flags = pkt['TCP'].flags
		flags = ''.join(f for f,x in FLAGS.items() if x & flags)
#		flags = ''.join(f for f in FLAGS_ORDER if f in flags)
		if session is None:
			session = TCPSession(pkt)
		else:
			session.add(pkt)

#		if flow.src != session.flow.src:
		print(flow, flags)

#		print(session.client_state, session.server_state)

	closed.append(session)
#	break


   192.168.0.11:52350 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52350 RA
   192.168.0.11:52362 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52362 RA
   192.168.0.11:52370 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52370 RA
   192.168.0.11:52366 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52366 RA
   192.168.0.11:52365 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52365 RA
   192.168.0.11:52369 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52369 RA
   192.168.0.11:52361 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52361 RA
   192.168.0.11:52349 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52349 RA
   192.168.0.11:52372 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11:52372 RA
   192.168.0.11:52356 ==>    192.168.0.12:666   S
   192.168.0.12:666   ==>    192.168.0.11