This notebook demonstrates how to use AWS Glue to prepare per-minute aggregates of VPC Flow Log data.

## License

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1586272817950_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
fl_bucket = '<S3 bucket containing flowlogs>'
acct = '<AWS account number>'
region = '<AWS region>'
df = spark.read.csv('s3://' + fl_bucket + '/AWSLogs/' + acct + '/vpcflowlogs/' + region + '/*/*/*/*.gz', 
                    sep=' ', header=True,
                   schema='version int , vpcid string , subnetid string , instanceid string , interfaceid string , account string , traffictype string , sourceaddr string , destinationaddr string , sourceport int , destport int , packetsourceaddress string , packetdestinationaddress string , protocol int , numbytes bigint , numpackets int , starttime int , endtime int , action string , tcpflags int , logstatus string ')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

919761

In [13]:
df.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[summary: string, version: string, vpcid: string, subnetid: string, instanceid: string, interfaceid: string, account: string, traffictype: string, sourceaddr: string, destinationaddr: string, sourceport: string, destport: string, packetsourceaddress: string, packetdestinationaddress: string, protocol: string, numbytes: string, numpackets: string, starttime: string, endtime: string, action: string, tcpflags: string, logstatus: string]

In [14]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- version: integer (nullable = true)
 |-- vpcid: string (nullable = true)
 |-- subnetid: string (nullable = true)
 |-- instanceid: string (nullable = true)
 |-- interfaceid: string (nullable = true)
 |-- account: string (nullable = true)
 |-- traffictype: string (nullable = true)
 |-- sourceaddr: string (nullable = true)
 |-- destinationaddr: string (nullable = true)
 |-- sourceport: integer (nullable = true)
 |-- destport: integer (nullable = true)
 |-- packetsourceaddress: string (nullable = true)
 |-- packetdestinationaddress: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- numbytes: long (nullable = true)
 |-- numpackets: integer (nullable = true)
 |-- starttime: integer (nullable = true)
 |-- endtime: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- tcpflags: integer (nullable = true)
 |-- logstatus: string (nullable = true)

In [None]:
df.head(2)

In [19]:
# Add timestamp
from pyspark.sql import functions as f
from pyspark.sql import types as t
df = df.withColumn('tstamp', df.starttime.cast(dataType=t.TimestampType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- version: integer (nullable = true)
 |-- vpcid: string (nullable = true)
 |-- subnetid: string (nullable = true)
 |-- instanceid: string (nullable = true)
 |-- interfaceid: string (nullable = true)
 |-- account: string (nullable = true)
 |-- traffictype: string (nullable = true)
 |-- sourceaddr: string (nullable = true)
 |-- destinationaddr: string (nullable = true)
 |-- sourceport: integer (nullable = true)
 |-- destport: integer (nullable = true)
 |-- packetsourceaddress: string (nullable = true)
 |-- packetdestinationaddress: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- numbytes: long (nullable = true)
 |-- numpackets: integer (nullable = true)
 |-- starttime: integer (nullable = true)
 |-- endtime: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- tcpflags: integer (nullable = true)
 |-- logstatus: string (nullable = true)
 |-- tstamp: timestamp (nullable = true)

In [None]:
df.head(2)

We want to compute aggregate egress and ingress traffic grouped into 1 minute windows.

In [22]:
cidr = '10.0.0.0/16'
dfIngress = df.filter(df.destinationaddr.like('10.0%'))
dfEgress = df.filter(df.sourceaddr.like('10.0%'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
dfIngress.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

621898

In [18]:
dfEgress.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

573396

In [29]:
wIngress = dfIngress.groupBy(f.window("tstamp", "1 minute"), dfIngress.protocol).agg(f.sum("numbytes").alias('avgbytes'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
wIngress.head(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(window=Row(start=datetime.datetime(2020, 4, 6, 0, 28), end=datetime.datetime(2020, 4, 6, 0, 29)), protocol=6, avgbytes=2795.821052631579), Row(window=Row(start=datetime.datetime(2020, 4, 3, 3, 43), end=datetime.datetime(2020, 4, 3, 3, 44)), protocol=17, avgbytes=76.0)]

In [36]:
wIngress.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- protocol: integer (nullable = true)
 |-- avgbytes: double (nullable = true)
 |-- start: timestamp (nullable = true)

In [32]:
wEgress = dfEgress.groupBy(f.window("tstamp", "1 minute"), dfEgress.protocol).agg(f.sum("numbytes").alias('avgbytes'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
wIngress = wIngress.withColumn('start', wIngress.window.start).drop('window')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
wEgress = wEgress.withColumn('start', wEgress.window.start).drop('window')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
wIngress.repartition(1).write.csv('s3://' + bucket + '/ingress.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
wEgress.repartition(1).write.csv('s3://' + bucket + '/egress.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…