# ESnet Netflow Data

## Python API
The python package pynfdump is a front-end to the tool nfdump providing a programmable interface that is very useful when processing netflow data. This package also include remote (via ssh) access.

Unfortunately, the version of the package that can be installed via pip is quite old and does not work. The latest checked in version in the github repository works: https://github.com/JustinAzoff/pynfdump/blob/master/pynfdump/nfdump.py

Also, while very useful, the pynfdump package does not returns all of the features available in netflow data, in particular, it does not return the "next hop" feature.

This notebook uses a modified version of nfdump.py that not only returns all of the features, but parses them in easy to use tables. The code can be found at the bottom of this page and needs to be executed before running any of the following cells.

The following retrieves netflow data from router wash-cr5.

Note that paths and remote hosts may be different with your data sets.

In [154]:
remote_root_path='/nfs/netflow/data'
remote_host='flowbox1.es.net'
router='wash-cr5'
path = '/nfs/netflow/data'

dumper = Dumper(path,sources=[router],profile='raw',remote_host=remote_host)

In [155]:
dumper.set_where(start="2018-02-20 11:00", end="2018-02-20 15:00")

In [156]:
records = dumper.search(limit=3)

In [158]:
results = import_data(records)

In [179]:
for srcip,dstip,next_hop,next_hop_bgp in zip(results['sa'],results['da'],results['nh'],results['nhb']):
    print srcip,dstip,next_hop,next_hop_bgp

134.158.84.23 134.79.128.10 134.55.36.45 134.55.36.45
160.91.22.86 172.217.15.67 134.55.219.2 134.55.219.2
151.101.202.62 134.167.1.1 134.55.42.69 134.55.42.69


In [172]:
results

{'al': ['    0.000', '    0.000', '    0.000'],
 'cl': ['    0.000', '    0.000', '    0.000'],
 'da': ['134.79.128.10', '172.217.15.67', '134.167.1.1'],
 'das': ['3671', '15169', '291'],
 'dir': ['0', '0', '0'],
 'dmk': ['16', '24', '16'],
 'dp': ['36443', '443', '11081'],
 'dtos': ['0', '0', '0'],
 'dvln': ['0', '0', '0'],
 'eng': ['0/0', '0/0', '0/0'],
 'exid': ['1', '1', '1'],
 'flg': ['.A....', '.AP...', '.A....'],
 'fwd': ['64', '64', '64'],
 'ibyt': ['60000000', '304000', '1420000'],
 'idmc': ['00:00:00:00:00:00', '00:00:00:00:00:00', '00:00:00:00:00:00'],
 'in': ['12', '14', '17'],
 'ipkt': ['40000', '1000', '1000'],
 'ismc': ['00:00:00:00:00:00', '00:00:00:00:00:00', '00:00:00:00:00:00'],
 'mpls1': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls10': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls2': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls3': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls4': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls5': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls6': ['0-0-0', '0-0-0', '0-0-0'],
 'mpls7': ['0-0-0',

In [50]:
z = !ssh flowbox1.es.net  'nfdump -M /nfs/netflow/data/raw/wash-cr5/2018/02/20 -R . -c 3 -o csv -t 2018/02/20.00:00-2018/02/20.15:00'

In [3]:
!ssh flowbox1.es.net  'nfdump -M /nfs/netflow/data/raw/wash-cr5/2018/02/20 -R . -c 1 -o fmt:%sa%da -t 2018/02/20.00:00-2018/02/20.15:00'

     Src IP Addr     Dst IP Addr
  198.124.238.49 198.129.254.138
Summary: total flows: 1, total bytes: 42000, total packets: 1000, avg bps: 0, avg pps: 0, avg bpp: 0
Time window: 2018-02-19 23:58:59 - 2018-02-20 00:04:58
Total flows processed: 7613, Blocks skipped: 0, Bytes read: 1048476
Sys: 0.071s flows/second: 105753.7   Wall: 0.009s flows/second: 838898.1  
Killed by signal 1.


In [None]:
import csv
with open('/tmp/mbps.csv') as f:
    for row in csv.reader(f):
        res['mbps'].append(float(row[1].split(':')[1].lstrip()))
        res['duration'].append(row[2].split(':')[1][:-4].lstrip())
        res['size'].append(row[0].split(':')[-1].lstrip())
    return res

In [56]:
import csv
r = csv.reader([z[1]])

In [57]:
for x in r:
    print x

['2018-02-20 00:00:00', '2018-02-20 00:00:00', '0.000', '198.124.238.49', '198.129.254.138', '8807', '9641', 'UDP', '......', '64', '0', '1000', '42000', '0', '0', '12', '16', '65291', '292', '30', '30', '0', '0', '134.55.36.45', '134.55.36.45', '0', '0', '00:00:00:00:00:00', '00:00:00:00:00:00', '00:00:00:00:00:00', '00:00:00:00:00:00', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '0-0-0', '    0.000', '    0.000', '    0.000', '134.55.200.147', '0/0', '1', '2018-02-20 00:00:10.187']


In [60]:

z

['ts,te,td,sa,da,sp,dp,pr,flg,fwd,stos,ipkt,ibyt,opkt,obyt,in,out,sas,das,smk,dmk,dtos,dir,nh,nhb,svln,dvln,ismc,odmc,idmc,osmc,mpls1,mpls2,mpls3,mpls4,mpls5,mpls6,mpls7,mpls8,mpls9,mpls10,cl,sl,al,ra,eng,exid,tr',
 '2018-02-20 00:00:00,2018-02-20 00:00:00,0.000,198.124.238.49,198.129.254.138,8807,9641,UDP,......,64,0,1000,42000,0,0,12,16,65291,292,30,30,0,0,134.55.36.45,134.55.36.45,0,0,00:00:00:00:00:00,00:00:00:00:00:00,00:00:00:00:00:00,00:00:00:00:00:00,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,    0.000,    0.000,    0.000,134.55.200.147,0/0,1,2018-02-20 00:00:10.187',
 '2018-02-20 00:00:00,2018-02-20 00:00:00,0.000,130.199.39.193,96.127.69.64,54805,443,TCP,.A....,64,0,1000,1500000,0,0,12,16,43,8987,16,17,0,0,134.55.36.45,134.55.36.45,0,0,00:00:00:00:00:00,00:00:00:00:00:00,00:00:00:00:00:00,00:00:00:00:00:00,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,0-0-0,    0.000,    0.000,    0.000,134.55.200.147,0/0,1,2018-02-20 00:00:10.187',
 '2018-02-20 00:00

## Annex: modified nfdump.py

In [153]:
# nfdump.py
# Copyright (C) 2008 Justin Azoff JAzoff@uamail.albany.edu
#
# This module is released under the MIT License:
# http://www.opensource.org/licenses/mit-license.php
"""
Python frontend to the nfdump CLI
"""

import os
from dateutil.parser import parse as parse_date
import datetime
fromtimestamp = datetime.datetime.fromtimestamp

from subprocess import Popen, PIPE
import select
import commands

from IPy import IP

FILE_FMT = "%Y %m %d %H %M".replace(" ","")

def load_protocols():
    #2.4 doesn't have socket.getprotocol by id
    f = open("/etc/protocols")
    protocols = {}
    for line in f:
        if not line.strip():
            break
    for line in f:
        if not line.strip(): break
        if line.startswith("#"): continue
        proto, num,_ = line.split(None,2)
        protocols[int(num)] = proto
    protocols[0]='ip'
    f.close()
    return protocols

def date_to_fn(date):
    return 'nfcapd.' + date.strftime(FILE_FMT)

STDOUT = 1
STDERR = 2
def mycommunicate(cmds):
    pipe = Popen(cmds, stdout=PIPE,stderr=PIPE)
    read_set = [pipe.stderr, pipe.stdout]

    waited = False

    #from the subprocess module
    try :
        while read_set:
            rlist, wlist, xlist = select.select(read_set, [], [])

            if pipe.stdout in rlist:
                data = pipe.stdout.readline()
                if data == "":
                    pipe.stdout.close()
                    read_set.remove(pipe.stdout)
                else:
                    yield STDOUT, data

            if pipe.stderr in rlist:
                data = os.read(pipe.stderr.fileno(), 1024)
                if data == "" or data.startswith("Killed by signal 1."):
                    pipe.stderr.close()
                    read_set.remove(pipe.stderr)
                else:
                    yield STDERR, data
    #work around python2.4 issue:
    #SyntaxError: 'yield' not allowed in a 'try' block with a 'finally' clause 
    except:
        pipe.wait()
        waited = True
    if not waited:
        pipe.wait()

def run(cmds):
    #print (cmds)
    for fd, data in mycommunicate(cmds):
        if fd == STDERR:
            raise NFDumpError(data)
        else:
            yield data

def maybe_int(val):
    try:
        val = int(val)
    except TypeError:
        pass
    except ValueError:
        pass
    return val

def maybe_split(val, sep):
    if hasattr(val, 'split'):
        return val.split(sep)
    return val

def flags_to_str(flags):
    s = ""
    s += flags & 32 and 'U' or '.'
    s += flags & 16 and 'A' or '.'
    s += flags &  8 and 'P' or '.'
    s += flags &  4 and 'R' or '.'
    s += flags &  2 and 'S' or '.'
    s += flags &  1 and 'F' or '.'
    return s

def import_data(data):
    results = {}
    features = 'ts,te,td,sa,da,sp,dp,pr,flg,fwd,stos,ipkt,ibyt,opkt,obyt,in,out,sas,das,smk,dmk,dtos,dir,nh,nhb,svln,dvln,ismc,odmc,idmc,osmc,mpls1,mpls2,mpls3,mpls4,mpls5,mpls6,mpls7,mpls8,mpls9,mpls10,cl,sl,al,ra,eng,exid,tr'
    features = features.split(',')
    for feature in features:
        results[feature] = []
    for entry in data:
        for index in range(len(features)):
            results[features[index]].append(entry[index])
    return results

class NFDumpError(Exception):
    pass

class Dumper:
    def __init__(self, datadir='/', profile='live',sources=None,remote_host=None,executable_path='nfdump'):
        if not datadir.endswith("/"):
            datadir = datadir + '/'
        self.datadir = datadir
        self.profile = profile
        self.sources = maybe_split(sources, ',')
        self.remote_host = remote_host
        self.exec_path = executable_path
        if os.path.isdir(self.exec_path):
            self.exec_path = os.path.join(self.exec_path, "nfdump")
        self.set_where()
        self.protocols = load_protocols()

    def set_where(self, start=None, end=None, filename=None,dirfiles=None, stdin=False):
        """Set the timeframe of the nfdump query.
        Specify one of the following:
            * The start date
            * The start and end date
            * one of the filename,dirfiles, or stdin options
        :param start: Start date and time
        :param end: Start date and time
        :param filename: Search this single filename
        :param dirfiles: Search this directory
        :param stdin:    Search stdin
        """
        
        self.start = start
        self.end = end

        self.sd = self.ed = None

        if start:
            self.sd = parse_date(start)
        if end:
            self.ed = parse_date(end)

        if not self.sd:
            self._where = '.'
        else:
            self._where = date_to_fn(self.sd)
            if self.ed:
                self._where += ":" + date_to_fn(self.ed)

        if dirfiles:
            self._where = dirfiles

        self.filename = filename
        if stdin:
            self.filename = '-'

    def _arg_escape(self, arg):
        """Escape any arguments so that they can be passed over SSH"""
        if self.remote_host:
            return commands.mkarg(arg)
        else:
            return arg

    def _base_cmd(self):
        cmd = []
        if self.remote_host:
            cmd = ['ssh', self.remote_host]
        cmd.extend([self.exec_path, '-q', '-o', 'csv'])

        if self.datadir and self.sources and self.profile:
            sources = ':'.join(self.sources)
            d = os.path.join(self.datadir, self.profile, sources)
            cmd.extend(['-M', d])

        if self.filename:
            cmd.extend(['-r', self.filename])
        else:
            cmd.extend(['-R', self._where])
        return cmd

    def search(self, query='', filterfile=None, aggregate=None, statistics=None, statistics_order=None,limit=None):
        """Run nfdump with the following arguments
        :param query: The nfdump filter
        :param filterfile: an optional file containing a nfdump filter
        :param aggregate: (True OR comma sep string OR list) of
            * srcip     - Source IP Address
            * dstip     - Destination IP Address
            * srcport   - Source Port
            * dstport   - Destination Port
        :param statistics: Generate netflow statistics info, one of
            * srcip     - Source IP Address
            * dstip     - Destination IP Address
            * ip        - Any IP Address
            * srcport   - Source Port
            * dstport   - Destination Port
            * port      - Any Port
            * srcas     - Source ASN
            * dstas     - Destination ASN
            * as        - Any ASN
            * inif      - Incoming Interface
            * outif     - Outgoing Interface
            * proto     - Protocol
        :param statistics_order: one of
            * packets
            * bytes
            * flows
            * bps       - Bytes Per Second
            * pps       - Packers Per Second
            * bpp.      - Bytes Per Packet
        :param limit: number of results
        """

        cmd = self._base_cmd()

        if aggregate and statistics:
            raise NFDumpError("Specify only one of aggregate and statistics")

        if statistics:
            s_arg = statistics
            if statistics_order:
                s_arg = "%s/%s" % (statistics, statistics_order)

            cmd.extend(["-s", s_arg])

        if aggregate:
            if aggregate is True:
                cmd.append("-a")
            else:
                aggregate = maybe_split(aggregate, ',')
                aggregate = ','.join(aggregate)
                aggregate = aggregate.replace(" ","")
                cmd.extend(["-a", "-A", self._arg_escape(aggregate)])

        if limit:
            if statistics:
                cmd.extend(['-n',str(limit)])
            else:
                cmd.extend(['-c',str(limit)])

        if filterfile:
            cmd.extend(['-f', filterfile])
        else:
            cmd.append(self._arg_escape(query))

        out = run(cmd)
        if statistics:
            return self.parse_stats(out, object_field=statistics)
        else:
            return self.parse_search(out)

    def parse_search(self, out):
        #    snprintf(data_string, STRINGSIZE-1 ,"%i|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%u|%llu|%llu",
        #                0 af, 1 r->first, 2 r->msec_first ,3 r->last, 4 r->msec_last, 5 r->prot,
        #                6 sa[0], 7 sa[1], 8 sa[2], 9 sa[3], 10 r->srcport, 11 da[0], 12 da[1], 13 da[2], 14 da[3], 15 r->dstport,
        #                16 r->srcas, 17 r->dstas, 18 r->input, 19 r->output,
        #                20 r->tcp_flags, 21 r->tos, 22 (unsigned long long)r->dPkts, 23 (unsigned long long)r->dOctets);

        for line in out:
            row = line.split(',')
            yield row

    def parse_stats(self, out,object_field):
        for line in out:
            parts = line.split("|")
            parts = [maybe_int(x) for x in parts]
            if not len(parts) > 10:
                #print line
                continue
            if '0|0|0|0' in line:
                object_idx = 9
            else:
                object_idx = 6
            row = {
                'af':           parts[0],
                'first':        fromtimestamp(parts[1]),
                #'msec_first':   parts[2],
                'last':         fromtimestamp(parts[3]),
                #'msec_last':    parts[4],
                'prot':         self.protocols.get(parts[5], parts[5]),
                object_field:   parts[object_idx],
                'flows':        parts[object_idx+1],
                'packets':      parts[object_idx+2],
                'bytes':        parts[object_idx+3],
                'pps':          parts[object_idx+4],
                'bps':          parts[object_idx+5],
                'bpp':          parts[object_idx+6],
            }

            if 'ip' in object_field:
                row[object_field] = IP(row[object_field])

            yield row


    def list_profiles(self):
        """Return a list of the nfsen profiles"""
        if not self.remote_host:
            return os.listdir(self.datadir)
        else:
            return run(['ssh', self.remote_host, '/bin/ls', self.datadir])[0].split()

    def get_profile_data(self, profile=None):
        """Return a dictionary of the nfsen profile data"""
        p = profile or self.profile
    
        path = os.path.join(self.datadir,p,'profile.dat')
    
        if not self.remote_host:
            data = open(path).read()
        else:
            data = run(['ssh', self.remote_host, '/bin/cat', path])[0]

        ret = {}
        sourcelist = []
        for line in data.splitlines():
            if not line: continue
            if line[0] in ' #': continue
            key, val = line.split(" = ", 1)
            if key == 'channel':
                chan = val.split(":")[0]
                sourcelist.append(chan)
                continue

            ret[key] = maybe_int(val)
        if sourcelist:
            ret['sourcelist'] = sourcelist
        return ret

    def flow_stats(self):
        """Run nfdump -I to get flow stats"""
        cmd = self._base_cmd()
        cmd.append("-I")
        out = run(cmd)
        return self.parse_flow_stats(out)

    def parse_flow_stats(self, out):
        stats = {}
        for line in out:
            key, value = line.split(": ")
            key = key.strip().lower()
            value = maybe_int(value.strip())
            stats[key] = value
        return stats

def search_file(filename, query='', filterfile=None, aggregate=None, statistics=None, statistics_order=None,limit=None):
    """Search a single nfcapd file
    :param filename: the file to search
    The rest of the options are passed directly to :func:`Dumper.search`
    """

    d = Dumper()
    d.set_where(filename=filename)
    return d.search(query, filterfile, aggregate, statistics, statistics_order, limit)

def flow_stats_file(filename):
    """Get flow stats for a single nfcapd file"""
    d = Dumper()
    d.set_where(filename=filename)
    return d.flow_stats()