# Graphistry Netflow Demo

In this example we are taking millions of rows of netflow (network traffic flow) data in order to search for anomalous activity within a network. We will query 70M+ rows of network security data (netflow) with BlazingSQL and pass it to Graphistry for visualization.

In [1]:
import os
import cudf
import graphistry
from blazingsql import BlazingContext 

### Download Data
The next cell will check for the existance of a `data` directory & download the data necessary to run this demo (unless you already have the data in the `data` directory).

In [2]:
# download taxi data
base_url = 'https://blazingsql-colab.s3.amazonaws.com/netflow_data/'
fn = 'nf-chunk2.csv'

# check if data directory exists
if not os.path.isfile('data'):
    print('creating blazingsql directory')
    # create folder
    os.system('mkdir ../../data/blazingsql')
    
# check if we already have the file
if not os.path.isfile('data/' + fn):
    # we don't let me know we're downloading it now
    print(f'Downloading {base_url + fn} to data/{fn}')
    # save nf-chunk2 to data folder, may take a few minutes to download
    urllib.request.urlretrieve(base_url + fn, 'data/' + fn)
# we already have data
else:
    # let us know
    print(f'data/{fn} already downloaded')

creating blazingsql directory
data/nf-chunk2.csv already downloaded


## Blazing Context
Here we are importing cuDF and BlazingContext. You can think of the BlazingContext much like a Spark Context (i.e. where information such as FileSystems you have registered and Tables you have created will be stored). If you have issues running this cell, restart runtime and try running it again.

In [3]:
# connect to BlazingSQL
bc = BlazingContext(pool=False)

BlazingContext ready


### Create & Query Tables
In this next cell we identify the full path to the data.

In [4]:
# make wildcard path to load all 4 parquet files into blazingsql
path = f'{os.getcwd()}/data/nf*.csv'

# what's the path? 
path

'/home/winston@blazingdb.com/bsql-demos/data/nf*.csv'

#### Create Table

In [5]:
%%time
# blazingsql table from file path
bc.create_table('netflow', path, header=0)

CPU times: user 15.1 ms, sys: 0 ns, total: 15.1 ms
Wall time: 13.1 ms


<pyblazing.apiv2.context.BlazingTable at 0x7f1b13b537b8>

In [6]:
%%time
# query the whole table 
bc.sql('SELECT * FROM netflow').tail()

CPU times: user 7.14 s, sys: 4.14 s, total: 11.3 s
Wall time: 10.8 s


Unnamed: 0,TimeSeconds,parsedDate,dateTimeStr,ipLayerProtocol,ipLayerProtocolCode,firstSeenSrcIp,firstSeenDestIp,firstSeenSrcPort,firstSeenDestPort,moreFragments,contFragments,durationSeconds,firstSeenSrcPayloadBytes,firstSeenDestPayloadBytes,firstSeenSrcTotalBytes,firstSeenDestTotalBytes,firstSeenSrcPacketCount,firstSeenDestPacketCount,recordForceOut
21526133,1365033000.0,2013-04-03 23:57:49,20130400000000.0,6,TCP,10.15.7.85,172.20.0.15,26886,80,0,0,11,19,503,297,619,5,2,0
21526134,1365033000.0,2013-04-03 23:57:49,20130400000000.0,6,TCP,10.15.7.85,172.20.0.15,27614,80,0,0,5,19,503,297,619,5,2,0
21526135,1365033000.0,2013-04-03 23:57:49,20130400000000.0,6,TCP,10.15.7.85,172.20.0.15,26887,80,0,0,11,19,503,297,619,5,2,0
21526136,1365033000.0,2013-04-03 23:57:49,20130400000000.0,6,TCP,10.15.7.85,172.20.0.15,27978,80,0,0,2,19,503,297,619,5,2,0
21526137,1365033000.0,2013-04-03 23:57:49,20130400000000.0,6,TCP,10.15.7.85,172.20.0.15,26888,80,0,0,11,19,503,297,619,5,2,0


#### Query
With the table made, we can simply run a SQL query.

We are going to run some joins and aggregations in order to condese these millions of rows into thousands of rows that represent nodes and edges.

In [7]:
%%time
# what are we looking for 
query = '''
        SELECT
            firstSeenSrcIp as source,
            firstSeenDestIp as destination,
            count(firstSeenDestPort) as targetPorts,
            SUM(firstSeenSrcTotalBytes) as bytesOut,
            SUM(firstSeenDestTotalBytes) as bytesIn,
            SUM(durationSeconds) as durationSeconds,
            MIN(parsedDate) as firstFlowDate,
            MAX(parsedDate) as lastFlowDate,
            COUNT(*) as attemptCount
        FROM
            netflow
        GROUP BY
            firstSeenSrcIp,
            firstSeenDestIp
            '''
# run sql query (returns cuDF DataFrame)
gdf = bc.sql(query)

CPU times: user 5.17 s, sys: 3.1 s, total: 8.26 s
Wall time: 8.14 s


In [8]:
# how do the results look?
gdf.tail(25)

Unnamed: 0,source,destination,targetPorts,bytesOut,bytesIn,durationSeconds,firstFlowDate,lastFlowDate,attemptCount
18856,10.0.0.9,172.30.1.16,1,632,391,0,2013-04-03 11:00:13,2013-04-03 11:00:13,1
18857,172.20.1.100,10.0.0.13,71,32532,44310,7,2013-04-03 06:50:40,2013-04-03 11:17:30,71
18858,10.0.0.7,172.10.1.44,2,1264,782,0,2013-04-03 09:56:02,2013-04-03 10:21:18,2
18859,172.30.2.133,10.1.0.76,67,30202,41315,13,2013-04-03 07:05:51,2013-04-03 12:05:57,67
18860,172.30.1.101,10.0.0.10,78,35358,49320,2,2013-04-03 06:50:53,2013-04-03 11:59:00,78
18861,172.10.1.84,10.0.0.14,102,46422,63979,27,2013-04-03 06:49:55,2013-04-03 14:59:48,102
18862,10.0.0.14,172.30.1.2,3,1258,500,0,2013-04-03 09:58:52,2013-04-03 11:15:37,3
18863,172.10.1.136,10.0.0.5,74,33643,46606,9,2013-04-03 06:51:00,2013-04-03 14:52:21,74
18864,172.10.1.207,10.0.0.10,88,40304,55017,19,2013-04-03 06:48:47,2013-04-03 15:15:35,88
18865,10.1.0.75,172.10.1.13,1,633,392,0,2013-04-03 10:24:26,2013-04-03 10:24:26,1


# BlazingSQL + Graphistry

In [9]:
%%time
# acquire your own key to make this run
graphistry.register(server='labs.graphistry.com', key=cudf.read_csv('gkey')['0'][0])

plotter = graphistry.bind(source="source", destination="destination")

# convert GPU DataFrame (gdf) .to_pandas() & visualize with Graphistry
plotter.plot(gdf.to_pandas())

CPU times: user 702 ms, sys: 15.8 ms, total: 718 ms
Wall time: 2.45 s
