### **Scalable Analysis of Network Measurements using Apache Spark**

Network operators look at different types of network traffic data to understand properties of their networks. One type of network measurement data is called passive network measurement data. Passive measurement data is collected directly from network devices (e.g., routers, switches) and reflects properties of routes or traffic that is already passing through the network.

In this assignment, you will analyze two types of passive network measurement data---traffic volumes and routes---to understand various properties of the traffic that traverses the Princeton campus network. 

#### Traffic Measurement with IPFIX

Routers in most networks collect IPFIX measurements (https://en.wikipedia.org/wiki/IP_Flow_Information_Export) directly from the routers. One proprietary form of IPFIX measurements, for example, which we have provided you in this assignment, is called NetFlow (https://en.wikipedia.org/wiki/NetFlow). The Cisco pages (http://www.cisco.com/c/en/us/products/ios-nx-os-software/ios-netflow/index.html) have more information on NetFlow. In this part of the assignment, you'll analyze a trace of NetFlow records captured from a router that connects the campus network to the Internet. The assignment will ask you to perform similar kinds of analysis that a network operator would perform---effectively asking questions about the top endpoints for traffic on the Princeton campus, the top traffic by port number, and so forth. (As you can imagine, when we start to think about security, the ability to analyze these baselines will come in handy!)

The flow records are in the file 'netflow.csv' on Blackboard under course materials. You need to download and place it in the directory 'assignment2/Passive Measurement/data'. To simplify the analysis, we have ensured that the IP addresses of the Princeton campus network start with 128.112 and have their 16 lower bits anonymized, to protect the privacy of users on the campus network (i.e., all of us!). To further simplify your task, we have parsed these records into CSV (comma-separated variable) format, with the names of the fields listed in the first row of the file. (In a real network, routers export IPFIX records as binary files.)

#### Measurement of Interdomain Routing with BGP Routing Tables

To help network operators understand the state of Internet routing, many routers also have the ability to "dump" BGP routing tables periodically into a static file. These routing tables contain information about each IP prefix, all BGP routes that the router learns for each prefix, and the "best" BGP route that the router ultimately selects. Analyzing the BGP routing tables can provide information about where traffic to different IP prefixes is destined.

In this assignment, we have provided you a routing table dump from a project called RouteViews (http://www.routeviews.org/). You can visit that site to learn more about the routing tables that they collect. `telnet route-views2.routeviews.org` will also give you a command-line prompt at a real BGP router at the routeviews project if you want to play around with a live routing table view.  http://routeviews.org/bgpdata/ has periodic binary routing table dumps and update logs from participating routers. Again, for this assignment, we have parsed the binary routing table dumps into a format that is easier to analyze directly.

#### Distributed Data Analysis with MapReduce and Apache Spark

The volume of data generated by networks has become so large that it can be difficult to process the data on a single machine. Network operators can analyze more data, more quickly by relying on parallel data analysis techniques, which permits analysis of much more data and often faster responses to various network events (e.g., traffic shifts, attacks). 

In this assignment, you will use the Apache Spark framework to perform parallel data nalysis. Apache Spark is a cluster computing technology, designed for fast computation. It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing. For more information, read the Spark documentation at https://spark.apache.org/documentation.html. We will cover the basic functionality of Spark during the precepts, and the basic tests below give you some idea of how Spark works.

Much of the distributed analysis that Spark enables is based on the MapReduce framework. See http://hci.stanford.edu/courses/cs448g/a2/files/map_reduce_tutorial.pdf for a brief high-level overview of MapReduce. We'll also cover this topic in more detail in precept.

This notebook has several parts. Each part starts with the instructions that you need to follow to complete the assignment. 

### ** Part 1: Test Spark functionality **

This part  checks if your environment is working properly. You don't need to write new code, but make sure that you execute the provided code to check your environment.

#### ** (1a) Parallelize, filter, and reduce **

In [28]:
# Check if Spark is working
largeRange = sc.parallelize(xrange(100000))
reduceTest = largeRange.reduce(lambda a, b: a + b)
filterReduceTest = largeRange.filter(lambda x: x % 7 == 0).sum()

# If the Spark jobs don't work properly these will raise an AssertionError
assert reduceTest == 4999950000
assert filterReduceTest == 714264285

#### ** (1b) Loading a text file **

In [30]:
# Check loading data with sc.textFile
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('netflow.csv')
fileName = os.path.join(baseDir, inputPath)

rawData = sc.textFile(fileName)
lineCount = rawData.count()

# If the text file didn't load properly an AssertionError will be raised
assert lineCount == 105365

### ** Part 2: Parse IPFIX Data **

IPFIX allows network devices to collect statistics about network traffic at each interface; Cisco NetFlow is a proprietary version of this standard. We have provided Cisco NetFlow data that we collected at Princeton's border router. The data is "unsampled"; in other words it compiles flow statistics for every packet that traverses any interface on the border router.

We used the `nfdump` tool to process the raw NetFlow data that the router collected. Each line, except for the header on top and the summary information at the bottom, logs the following information for a flow:
```
Date first seen,Date last seen,Duration, Proto, Src IP Addr, Src Pt,Dst IPAddr, Dst Pt, Packets, Bytes, Flags, Input, Output, Router IP, Next-hop IP, BGP next-hop IP, 
Src AS, Dst AS, SMask, DMask
```

##### Exercise:  Parse the NetFlow data into a Spark data structure.
To process the data, we first need to create a Spark data structure called a resilient distributed dataset (RDD) with the name `flow_records`, where each entry in the RDD is a tuple:
```(tstart, srcip, srcport, dstip, dstport, bytes, flag, proto)```

An RDD is similar to any other structured data format, but it is designed to distribute across a cluster. In this assignment, you will manipulate your RDD locally within your virtual machine, but all of the code that you write to manipulate your RDD could in principle be distributed across a cluster.


In [None]:
import sys
import os
from test_helper import Test

import re
import datetime
from dateutil import parser

from pyspark.sql import Row

baseDir = os.path.join('data')
inputPath = os.path.join('netflow.csv')
logFile = os.path.join(baseDir, inputPath)

def parseLogLine(logline):
    """ TODO: Parse a line in the Netflow Log
    Args:
        logline (str): a line of text in the format:
        Date first seen,Date last seen,Duration, Proto, Src IP Addr, Src Pt,Dst IPAddr, Dst Pt, Packets,    
        Bytes, Flags, Input, Output, Router IP, Next-hop IP, BGP next-hop IP, Src AS, Dst AS, 
        SMask, DMask
    Returns:
        tuple: ((tstart, srcip, srcport, dstip, dstport, data, flag, proto), 1) for valid line,
               or the original invalid log line and 0
    """
    arrLogLine = logline.split(',')
    line_tuple = ((arrLogLine[0], arrLogLine[6], arrLogLine[7], arrLogLine[8], arrLogLine[9], arrLogLine[11], arrLogLine[5]),1)
    return line_tuple

def printing(string):
    print string

    
def parseNetflow():
    """ TODO: Read and parse log file """
    with open(logFile) as f:
        lines = f.readlines()
        
    tuples = []
    i = 0
    for line in lines:
        if i == 0:
            i = 1
            continue
        tuples.append(parseLogLine(line))
    
    flow_records = sc.parallelize(tuples)
    
#     tuples = []
#     for line in logFile:
#         tuples.append(parseLogLine(line))
    
#     flow_records = sc.textFile(logFile)

#     tupled_flow_records = flow_records.map(lambda a: parseLogLine(a))
    print flow_records.take(2)
    print "\n\n"
    return flow_records


flow_rec = parseNetflow()
#print flow_rec.take(2)
print "Number of flow records: ", flow_rec.count()
print "Sample flow entries: ", flow_rec.take(2)
print "\n\n\n\n"

#print parseLogLine(flow_rec.take(2))

this is the line 2015-10-29,06:04:48.930,2015-10-29,06:04:48.930,0.000,ICMP,172.16.241.1,0,128.112.213.189,11.0,1,94,.A....,120,0,0.0.0.0,0.0.0.0,0.0.0.0,0,0,0,0

22
2015-10-29
this is the line 2015-10-29,06:04:48.934,2015-10-29,06:04:48.934,0.000,TCP,116.211.0.90,52704,128.112.186.67,9095,1,40,....S.,120,70,0.0.0.0,0.0.0.0,0.0.0.0,174,0,0,0

22
2015-10-29
this is the line 2015-10-29,06:04:48.934,2015-10-29,06:04:48.934,0.000,TCP,74.82.47.46,35069,128.112.248.244,6379,1,40,....S.,118,70,0.0.0.0,0.0.0.0,0.0.0.0,1785,0,0,0

22
2015-10-29
this is the line 2015-10-29,06:04:48.934,2015-10-29,06:04:48.934,0.000,TCP,116.211.0.90,52704,128.112.186.124,9047,1,40,....S.,120,70,0.0.0.0,0.0.0.0,0.0.0.0,174,0,0,0

22
2015-10-29
this is the line 2015-10-29,06:04:48.942,2015-10-29,06:04:48.942,0.000,TCP,202.75.54.174,443,128.112.244.229,51444,1,44,.A..S.,120,70,0.0.0.0,0.0.0.0,0.0.0.0,174,0,0,0

22
2015-10-29
this is the line 2015-10-29,06:04:48.946,2015-10-29,06:04:48.946,0.000,TCP,116.211.0.90,5270

### ** Part 3: Analyze NetFlow Data **

In this part of the assignment, you will learn how to apply MapReduce techniques to analyze NetFlow data. There are many network traffic analysis questions you can ask with MapReduce. Counting the instance of particular items that match a particular property (e.g., all flows that are destined to a particular source IP address) is a natural analysis question for MapReduce because the data can be distributed across workers, and the sum from each worker can be aggregated in the "reduce" phase.

Here, we will focus on using MapReduce analysis to identify popular IP addresses from the NetFlow data.

#### **Popular source and destination IP addresses at Princeton's border router**
You will analyze the network flows observed at Princeton's border router to determine the popular source and destination IP addresses. We can use two metrics to determine popular IP adresses, (1) number of flows, and (2) traffic volume.  

##### ** Exercise: Determine popular IP addresses by number of flows.**

In [None]:
# TODO: RDD where each entry is a tuple (dstip, total # of flows)
dstipToCount = <>
# TODO: RDD where each entry is a tuple (srcip, total # of flows)
srcipToCount = <>

# Select the top K=50 entries from the computed RDDs.
# Use the same value of K for all cells below.
K = 50

# TODO: Popular destination IP prefixes (top-K) by count
topKdstipToCount = <>
# TODO: Popular destination IP prefixes (top-K) by count
topKsrcipToCount = <>

print 'Top 5 popular destination IP addresses', topKdstipToCount[:5]
print 'Top 5 popular source IP addresses', topKsrcipToCount[:5]

##### **Plot the distribution of popular IP addresses.**

We will now visualize the number of flows for the top popular source and destination IP addresses by number of flows. 

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

# Distribution of top-K popular IP addresses
value1 = [int(x[1]) for x in topKsrcipToCount]
value2 = [int(x[1]) for x in dstipToCount.takeOrdered(K, key = lambda x: -x[1])]
fig = plt.figure(figsize=(8,3))
plt.plot(range(len(value1)), value1, color = 'k')
plt.plot(range(len(value2)), value2, color = 'r')
plt.xlabel('Top-K IP Addresses')
plt.ylabel('# of Flows')
pass

##### **Exercise: Determine popular IP addresses by volume.**
In this exercise, you will determine the popular source and destination IP addresses by volume. 

*Hint:* 6th entry in every flow record represents the number of bytes transferred. 

In [None]:
# TODO: RDD where each entry is a tuple (dstip, total bytes)
dstipToVolume = <>
# TODO: RDD where each entry is a tuple (dstip, total bytes)
srcipToVolume = <>

# TODO: Popular destination IP prefixes (top-K)
topKdstipToVolume = <>
# TODO: Popular source IP prefixes (top-K)
topKsrcipToVolume = <>

print 'Top 5 popular destination IP addresses', topKdstipToVolume[:5]
print 'Top 5 popular source IP addresses', topKsrcipToVolume[:5]

##### **Plot the distribution of popular IP addresses by traffic volume.**

You will now plot both distributions (by IP address and by volume) together, on the same plot.

In [None]:
# Distribution of top-K popular IP addresses by # of flows
value1 = [int(x[1]) for x in srcipToCount.takeOrdered(K, key = lambda x: -x[1])]
value2 = [int(x[1]) for x in dstipToCount.takeOrdered(K, key = lambda x: -x[1])]
fig = plt.figure(figsize=(8,3))
plt.plot(range(len(value1)), value1, color = 'k')
plt.plot(range(len(value2)), value2, color = 'r')
plt.xlabel('Top-K IP Addresses')
plt.ylabel('# of Flows')
pass

# Distribution of top-K popular IP addresses by volume
value1 = [int(x[1])/1000 for x in srcipToVolume.takeOrdered(K, key = lambda x: -x[1])]
value2 = [int(x[1])/1000 for x in dstipToVolume.takeOrdered(K, key = lambda x: -x[1])]
fig = plt.figure(figsize=(8,3))
plt.plot(range(len(value1)), value1, color = 'k')
plt.plot(range(len(value2)), value2, color = 'r')
plt.xlabel('Top-K IP Addresses')
plt.ylabel('Traffic Volume (KB)')
pass

### ** Part 4: Analyze Application Traffic**

In this part, you will analyse the NetFlow traffic logs to identify common application protocols, according to destination port. In practice, a network operator may want to identify popular applications (e.g., video streaming) to make provisioning plans, or to change network configurations to treat the traffic differently (e.g., to apply traffic shaping, to route the traffic on different links).

One of the difficulties, of course, is that many applications today traverse port 80. We'll look at better ways to do application identification in future assignments.

In [None]:
# TODO: RDD where each entry is a tuple (dstport, total # of flows)
dstportToCount = <>

# TODO: Popular destination ports (top-K) by count
topKdstportToCount = <>

print 'Top 5 popular destination ports', topKdstportToCount[:5]

##### **Plot distribution of top-K popular applications**

We will now plot the graph for the popular applications (destination ports).

In [None]:
labels = [x[0] for x in topKdstportToCount]
count = flow_records.count()
fracs = [100*(float(x[1]) / count) for x in topKdstportToCount]

fig = plt.figure(figsize=(4.5, 4.5), facecolor='white', edgecolor='white')
colors = ['purple', 'lightskyblue', 'yellowgreen', 'gold', 'lightcoral', 'yellow',
         'orange','lightgreen','darkblue','pink']
explode = (0.05, 0.05, 0.1, 0, 0, 0,0.1, 0, 0, 0)
patches, texts, autotexts = plt.pie(fracs, labels=labels, colors=colors,
                                    #explode=explode, 
                                    autopct=pie_pct_format,
                                    shadow=False,  startangle=250)
for text, autotext in zip(texts, autotexts):
    if autotext.get_text() == '':
        text.set_text('')  # If the slice is small to fit, don't show a text label
plt.legend(labels, loc=(0.80, -0.1), shadow=True)
pass

#### **Analyse HTTP Traffic**

##### **Determine the top web servers that are sending traffic to Princeton. Consider the ports 80 and 443 for the web servers.**

In [None]:
# TODO: RDD where each entry is a tuble (IP address, total bytes)
webTrafficToVolume = <>

# TODO: Popular web servers (top-k) by volume.
topKwebServers = <>
print 'Top 5 popular web servers', topKWebServers[:5]

### ** Part 5: Analyze BGP Routing Tables **

Up until this point in the assignment, we have looked at top traffic flows by individual IP addresses, but a network operator might also be interested in exploring which other networks (i.e., autonomous systems) are responsible for sending or receiving traffic to its network. From our previous lectures on peering and Internet business relationships, it should be clear why an operator might care about knowing which ASes are sending traffic its way! This information may also be useful for exploring various kinds of network attacks (e.g., sources of denial of service attacks), which we will explore in the next assignment.

In this part of the assignment, you will combine the NetFlow data we have already been exploring with BGP routing information, to associate each IP address with its associated origin AS. To do so, we'll use publicly available routing information from the RouteViews project.

The RouteViews project, as described above, allows network operators to obtain real-time information about the global routing system from the perspectives of several different autonomous systems around the Internet. The RouteViews servers act as software BGP routers, obtaining their BGP routing information via BGP sessions, just like any other router would learn BGP routes. The main difference between the RouteViews servers and other BGP-speaking routers is that the RouteViews servers do not forward any real Internet traffic.

RouteViews periodically logs BGP routing tables (sometimes called Routing Information Base, or a "RIB") in a binary format called MRT. You can check the latest dumps [here](ftp://archive.routeviews.org/). We collected data from one such server and used the `bgpdump` tool to parse the data into a more parsable output format. The entries in the BGP RIB table look like the ones shown below: 
```
TIME: 03/07/16 02:00:00
TYPE: TABLE_DUMP_V2/IPV4_UNICAST
PREFIX: 0.0.0.0/0
SEQUENCE: 0
FROM: 185.44.116.1 AS47872
ORIGINATED: 03/06/16 20:27:05
ORIGIN: IGP
ASPATH: 47872 3356
NEXT_HOP: 185.44.116.1
COMMUNITY: 3356:2 3356:514 3356:2087 47872:1 47872:3356

TIME: 03/07/16 02:00:00
TYPE: TABLE_DUMP_V2/IPV4_UNICAST
PREFIX: 0.0.0.0/0
SEQUENCE: 0
FROM: 80.241.176.31 AS20771
ORIGINATED: 03/04/16 10:21:21
ORIGIN: IGP
ASPATH: 20771 1299
NEXT_HOP: 80.241.176.31
```

BGP RIBs might have multiple entries for an IP prefix. 

For this assignment, we considered a single entry for an IP prefix. We translated this data into a `csv` file (with de-limiter `;`). Each line provides the information about the BGP attributes in the following order:
```
TIME, ORIGIN, FROM, SEQUENCE, ASPATH, PREFIX, NEXT_HOP
```

##### **Exercise: Parse the BGP routing table into a Spark RDD.**
You will now parse the BGP RIB that we have provided into a Spark RDD, where each entry in the RDD is a (prefix, source AS) tuple.

In [None]:
inputBGPPath = os.path.join('bgp_rib.csv')
BGPFile = os.path.join(baseDir, inputBGPPath)

def parseBGPRibLine(logline):
    """ TODO: Parse a line in the BGP RIB file
    Args:
        logline (str): a line of text in the format: 
        TIME; ORIGIN; FROM; SEQUENCE; ASPATH; PREFIX; NEXT_HOP
    Returns:
        tuple: (prefix, source AS)
    """
        

def parseBGPRib():
    """ TODO: Read and parse BGP data """
    bgp_records = <>
    
    return bgp_records

In [None]:
bgp_records = parseBGPRib()
totalBGPRecords = bgp_records.count()
print "Total BGP records: ", totalBGPRecords

##### **Exercise: Jointly Analyze BGP & Netflow Data **

Finally, combine the NetFlow and BGP data to determine ASes around the Internet that send and receive the most Internet traffic, both by flows and by volume.

In [None]:
# TODO: list where each entry is a tuple (source AS, total # of flows)
topKSrcASNToCount = []
# TODO: list where each entry is a tuple (destination AS, total # of flows)
topKDstASNToCount = []
# TODO: list where each entry is a tuple (source AS, total bytes)
topKSrcASNToVolume = []
# TODO: list where each entry is a tuple (destination AS, total bytes)
topKDstASNToVolume = []
print 'Top 5 popular source ASN by # of flows', topKSrcASNToCount[:5]
print 'Top 5 popular destination ASN by # of flows', topKDstASNToCount[:5]
print 'Top 5 popular source ASN by volume', topKSrcASNToVolume[:5]
print 'Top 5 popular destination ASN by volume', topKDstASNToVolume[:5]