# VPC Flow Logs report
Goal: Compare configuration to traffic, harden security groups and NACL's by traffic.

# Setup

## Parameters and AWS session configuration

In [None]:
from PIL import Image
import io
import os
import time
import io
import numpy as np
import boto3
import os
import json
import pandas as pd
import matplotlib.pyplot as plt
import urllib.parse
import datetime
from boto3.session import Session
import ipaddress
import configparser
pd.options.display.width = 0


vpc_flow_log_bucket = os.environ['S3_FLOW_LOG_BUCKET'] if 'S3_FLOW_LOG_BUCKET' in os.environ else None
vpc_flow_log_object_path = os.environ['S3_FLOW_LOG_PATH'] if 'S3_FLOW_LOG_BUCKET' in os.environ else None
account_id = os.environ['ACCOUNT_ID'] if 'ACCOUNT_ID' in os.environ else None
aws_profile = os.environ['AWS_PROFILE'] if 'AWS_PROFILE' in os.environ else None
aws_default_region = os.environ['AWS_DEFAULT_REGION'] if 'AWS_DEFAULT_REGION' in os.environ else None
aws_assume_role_profile = os.environ['ASSUME_RULE_PROFILE'] if 'ASSUME_RULE_PROFILE' in os.environ else None
aws_role_session_name = os.environ['AWS_ROLE_SESSION_NAME'] if 'AWS_ROLE_SESSION_NAME' in os.environ else 'vpc-flow-log-notebook'
aws_shared_cred_file = os.path.expanduser(os.environ['AWS_SHARED_CREDENTIALS_FILE']) if 'AWS_SHARED_CREDENTIALS_FILE' in os.environ\
else os.path.expanduser('~/.aws/credentials') 

if not account_id or not vpc_flow_log_bucket or not vpc_flow_log_object_path:
    raise Exception("Not all required envrioment variables are set")

use_assume_rule = aws_assume_role_profile is not None

if not aws_default_region:
    os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'

if aws_profile:
    session = boto3.Session(profile_name=aws_profile)
else:
    session = boto3.session.Session()

if use_assume_rule:
    config = configparser.ConfigParser()
    config.read(aws_shared_cred_file)
    role_name = config[aws_assume_role_profile]['role_arn']
    external_id = config[aws_assume_role_profile]['external_id']
    creds = session.client('sts').assume_role(RoleArn=role_name, RoleSessionName=aws_role_session_name,\
                                            ExternalId=external_id)['Credentials']
    session = Session(aws_access_key_id=creds['AccessKeyId'],
                      aws_secret_access_key=creds['SecretAccessKey'],
                      aws_session_token=creds['SessionToken'])
    regions = [region['RegionName'] for region in session.client('ec2').describe_regions()['Regions']]
    print("Assumed the role: " + role_name)

In [None]:
im = Image.open("./assets/vpc_dag.png")
im

# Configuration research

## Collect ENI Configuration Data
Rationale: vpc flogs logs are not mapped to security groups

In [None]:
def get_eni_configuration():
    result = {}
    enis=[]
    for region in regions:
        region_name=region
        enis=session.client('ec2',region_name=region_name).describe_network_interfaces(MaxResults=1000)['NetworkInterfaces']
        for eni in enis:
            eni_id = eni['NetworkInterfaceId']
            for group in eni['Groups']:
                sg_id = group['GroupId']
                record_key = "{}_{}".format(eni_id,sg_id)
                record=[eni_id,sg_id,eni['PrivateIpAddress'],group['GroupName'],eni['Description'],eni['AvailabilityZone'],eni['VpcId'],region_name]
                result[record_key] = record

    return pd.DataFrame.from_dict(result,orient='index', columns=['NetworkInterfaceId','GroupId','PrivateIpAddress','GroupName','eniDescription','AvailabilityZone','VpcId','Region']).sort_values(by=['NetworkInterfaceId'])

In [None]:
eni_conf_table = get_eni_configuration()

## Collect Security Groups Configuration Data

### Private IP classification

In [None]:
im = Image.open("./assets/private_ip_dag.png")
im

In [None]:
def is_in_private_subnet(ip,vpcs_cidrs):
    for vpc_cidrs_value in vpcs_cidrs.values():
        for subnet in vpc_cidrs_value:
            ip_network = ipaddress.ip_network(ip)
            subnet_network = ipaddress.ip_network(subnet)
            if (ip_network.overlaps( subnet_network)) :
                return True
    return False

def cidr_ranges(rule,vpcs_cidrs):
    IpRanges = []
    Ipv6Ranges = []
    is_ip_private = True
    is_ipv6_private = True
    if len(rule['IpRanges'])>0:
        for cidr in rule['IpRanges']:
            cidr_ip = ipaddress.ip_network(cidr['CidrIp'])
            is_ip_private = cidr_ip.is_private
            if cidr_ip == ipaddress.ip_network('0.0.0.0/0'):
                is_ip_private = False
            elif is_in_private_subnet(cidr_ip,vpcs_cidrs):
                is_ip_private = True

            IpRanges.append(cidr_ip)    
    if len(rule['Ipv6Ranges'])>0:
        for cidr in rule['Ipv6Ranges']:
            cidr_ip = ipaddress.ip_network(cidr['CidrIpv6'])
            Ipv6Ranges.append(cidr_ip) 
            is_ipv6_private = cidr_ip.is_private
    return IpRanges, Ipv6Ranges, is_ip_private, is_ipv6_private

In [None]:
def add_sg_configuration(eni_conf_table):
    eni_conf_dict = eni_conf_table.to_dict('r')
    result = {}
    vpcs_cidrs = get_vpc_cidrs(eni_conf_table)
    for eni_conf in eni_conf_dict:
        try:
            sg_details = session.resource('ec2',region_name=eni_conf['Region']).SecurityGroup(eni_conf['GroupId'])
            for rule in sg_details.ip_permissions:
                internal = False
                sg_pairs = []
                if 'IpProtocol' in rule and rule['IpProtocol']=='-1':
                    allowed_port = 'all'
                if 'FromPort' in rule:
                    allowed_port = rule['FromPort']
                if len(rule['UserIdGroupPairs'])>0:
                    internal = True
                    for pair in rule['UserIdGroupPairs']:
                        sg_pairs.append(pair['GroupId'])
                        if pair['UserId'] != account_id:
                            internal = False
                IpRanges, Ipv6Ranges, is_ip_private, is_ipv6_private = cidr_ranges(rule,vpcs_cidrs)

                record_key = "{}_{}_{}".format(eni_conf['NetworkInterfaceId'],eni_conf['GroupId'],allowed_port)
                record=[eni_conf['NetworkInterfaceId'],eni_conf['GroupId'],allowed_port,internal,eni_conf['PrivateIpAddress'],eni_conf['GroupName'],eni_conf['eniDescription'],eni_conf['AvailabilityZone'],eni_conf['VpcId'],eni_conf['Region'],rule['IpProtocol'],IpRanges,Ipv6Ranges,is_ip_private, is_ipv6_private,sg_pairs]
                result[record_key] = record
        except Exception:
            print(eni_conf['Region'],eni_conf['GroupId'] , Exception)
            continue
    return pd.DataFrame.from_dict(result,orient='index', columns=['NetworkInterfaceId','GroupId','Port','IsLimitedToAccount','PrivateIpAddress','GroupName','eniDescription','AvailabilityZone','VpcId','Region','IpProtocol','IpRanges','Ipv6Ranges','is_ip_private', 'is_ipv6_private','sg_pairs']).sort_values(by=['NetworkInterfaceId'])        

def get_vpc_cidrs(eni_conf_table):
    vpc_region_dict = eni_conf_table[['VpcId','Region']].to_dict('r')
    vpc_region_dict = [dict(tupleized) for tupleized in set(tuple(item.items()) for item in vpc_region_dict)]
    vpc_to_cidrs_dict = {}
    for vpc_tuple in vpc_region_dict:
        vpc_cidrs = []
        vpc_id = vpc_tuple['VpcId']
        vpc_details = session.resource('ec2',region_name=vpc_tuple['Region']).Vpc(vpc_id)
        vpc_cidrs.append(vpc_details.cidr_block)
        for cidr_association in vpc_details.cidr_block_association_set:
            if 'CidrBlock' in cidr_association:
                vpc_cidrs.append(cidr_association['CidrBlock'])
        vpc_cidrs = list(set(vpc_cidrs))
        vpc_to_cidrs_dict[vpc_id] = vpc_cidrs
    return vpc_to_cidrs_dict

### List network interface and security groups Details

In [None]:
eni_sg_conf=add_sg_configuration(eni_conf_table)   

#### Enriched Data

In [None]:
eni_sg_conf.sort_values(by=['NetworkInterfaceId'])

### List public facing security groups

In [None]:
public_enis = eni_sg_conf.loc[(eni_sg_conf['is_ip_private']==False)& ( (eni_sg_conf['IsLimitedToAccount']==False))]
pd.set_option('display.max_rows', public_enis.shape[0]+1)
public_enis.head()

In [None]:
unique_public_enis = list(set(public_enis['NetworkInterfaceId'].tolist()))
unique_public_ports = list(set(public_enis['Port'].tolist()))
unique_public_ip_ranges = public_enis['IpRanges'].tolist()
unique_public_ipv6_ranges = public_enis['Ipv6Ranges'].tolist()

In [None]:
public_enis_as_string = ",".join("'" + x + "'" for x in unique_public_enis)
public_ports_as_string = ','.join("'" + str(x) + "'" for x in unique_public_ports)

print("Public facing ENIs: {}".format(public_enis_as_string))
print("Public facing ports: {}".format(public_ports_as_string))

# Flow research

## Reduce VPC Data
1. select fields: account, interfaceid, destinationaddress, destinationport,protocol,action,sourceaddress, sourceport     (remove columns:  starttime,endtime,numpackets,numbytes,logstatus) 
2. remove duplicated rows  (we ignore a lot of columns so it should be significant)

## Athena configuration

In [None]:
session = boto3.Session(profile_name=aws_profile)
s3_input = 's3://{}/{}'.format(vpc_flow_log_bucket,vpc_flow_log_object_path)
s3_output_bucket_name = '<output_bucket_name>' 
s3_output_path = 's3://{}'.format(s3_output_bucket_name)
database = '<database_name>'
table = '<table_name>'
view_name = 'port_address_view'

In [None]:
#Function for executing athena queries
def run_query(session,query, s3_output, database=None):
    print("Executing query: {}".format(query))
    client = session.client('athena')
    if (database is None):
        response = client.start_query_execution(
        QueryString=query,
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    else:
        response = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': database
                },
            ResultConfiguration={
                'OutputLocation': s3_output,
                }
            )
        print('Execution ID: ' + response['QueryExecutionId'])
    return response

def obtain_data(session, filename):
        try:
            objectKey = filename + '.csv'
            print (objectKey)
            resource = session.resource('s3')
            
            response = resource \
            .Bucket(s3_output_bucket_name) \
            .Object(key= objectKey) \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)
 
        


# Create Athena VPC flow log database and table definition
create_database = "CREATE DATABASE IF NOT EXISTS %s;" % (database)

# Create VPC flow log table in the created database
create_table = \
    """CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
  `version` int,
  `account` string,
  `interfaceid` string,
  `sourceaddress` string,
  `destinationaddress` string,
  `sourceport` int,
  `destinationport` string,
  `protocol` int,
  `numpackets` int,
  `numbytes` bigint,
  `starttime` int,
  `endtime` int,
  `action` string,
  `logstatus` string
     )
     ROW FORMAT DELIMITED
     FIELDS TERMINATED BY ' '
     LOCATION '%s'
     TBLPROPERTIES ("skip.header.line.count"="1")""" % ( database, table, s3_input )

# Create the view (interfaceid, destinationport, destinationaddress, numbytes) from the VPC flow log table
create_destip_destports_view = \
"""
CREATE OR REPLACE VIEW %s AS 
SELECT interfaceid, destinationport, destinationaddress, numbytes
FROM %s
WHERE action != 'REJECTED' AND contains (ARRAY[%s], interfaceid) AND contains (ARRAY[%s], destinationport)
GROUP BY interfaceid, destinationport,destinationaddress,numbytes""" % (view_name, table, public_enis_as_string, public_ports_as_string)

## Create Athena database, table, and view (ENI, destination port, destination address, numbytes) from VPC flow logs

In [None]:
# creating the database if not exists
create_db_result = run_query(session,create_database, s3_output_path)

# Create the flowlogs table combining all collected data from the bucket
create_flow_logs_table = run_query(session,create_table, s3_output_path, database)

# Create flowlog view to query from
create_port_address_view = run_query(session,create_destip_destports_view, s3_output_path, database)
print(create_port_address_view)

time.sleep(30)

## Query the view with AWS Athena and obtain the results from S3 bucket

In [None]:
result_file = run_query(session,"SELECT * FROM test_vpc_flowlogs_database.port_address_view",s3_output_path, database)
time.sleep(700) #Set according to expected query time, for ~180GB it's ~350 seconds of query time
file_name = result_file['QueryExecutionId']
destport_destaddress_table = obtain_data(session, file_name)
destport_destaddress_table.head()

## Flatten CIDRs of ENI records

In [None]:
public_enis_flat_cidrs = public_enis.explode('IpRanges').reset_index()
public_enis_flat_cidrs['eni_sg_port_ipranges'] = public_enis_flat_cidrs['index'] +'_'+ public_enis_flat_cidrs['IpRanges'].map(str)
public_enis_flat_cidrs = public_enis_flat_cidrs.set_index('eni_sg_port_ipranges').drop('index',axis=1)

In [None]:
public_enis_flat_cidrs.head()

In [None]:
from netaddr import IPNetwork, IPAddress

In [None]:
total_traffic_bytes = 0
used_enis = []

def compute_cidrs_in_use(eni_id,ipv4_range,ipv6_range):
    bytes_per_cidr = 0
    cidr_in_use = False
    for _, row in destport_destaddress_table[destport_destaddress_table['interfaceid'] == eni_id].iterrows():
        ip = ipaddress.ip_address(row['destinationaddress'])
        port = int(row['destinationport'])
        num_bytes = int(row['numbytes'])
        if ipv4_range:
            if ip.version == 4 and ip in ipv4_range and port in unique_public_ports:
                cidr_in_use = True
                bytes_per_cidr += num_bytes
        if ipv6_range:
            if ip.version == 6 and ip in ipv6_range and port in unique_public_ports:
                cidr_in_use = True
                bytes_per_cidr += num_bytes
    return cidr_in_use,bytes_per_cidr

def verify_address_port_in_use(eni_record):
    global total_traffic_bytes
    global used_enis
    ipv4_range = eni_record['IpRanges']
    ipv6_range = eni_record['Ipv6Ranges']
    eni_id = eni_record['NetworkInterfaceId']
    print("Verifying ENI {}, CIDR {}".format(eni_id,ipv4_range))
    print("Total Bytes collected" , total_traffic_bytes)
    cidr_in_use,bytes_per_cidr = compute_cidrs_in_use(eni_id,ipv4_range,ipv6_range)
    print("Is in use: {}, Number of Bytes {}".format(cidr_in_use,bytes_per_cidr))
    if cidr_in_use:
        used_enis.append(True)
        total_traffic_bytes += bytes_per_cidr
        print("Total Bytes collected" , total_traffic_bytes)
    else:
        used_enis.append(False)

In [None]:
print("Verifying {} ENIs".format(len(public_enis_flat_cidrs)))
public_enis_flat_cidrs.apply(lambda x: verify_address_port_in_use(x),axis=1)
public_enis_flat_cidrs['In Use'] = used_enis
public_enis_flat_cidrs.head()

## Summary

In [None]:
enis_merge_flow_log = pd.merge(destport_destaddress_table, public_enis, left_on= ['interfaceid', 'destinationport'],
                   right_on= ['NetworkInterfaceId', 'Port'], 
                   how = 'outer')

In [None]:
print("Open ports that still in use: {} results found".format(len(public_enis_flat_cidrs[public_enis_flat_cidrs['In Use'] == True])))
print("Open ports that are not in use: {} results found".format(len(public_enis_flat_cidrs[public_enis_flat_cidrs['In Use'] == False])))
print("Total bytes transffered in used ports {}".format(total_traffic_bytes))

were_in_use = enis_merge_flow_log[['interfaceid', 'destinationport', 'GroupId']]
were_in_use = were_in_use.where(were_in_use['GroupId'].isnull()).dropna(how='all')
were_in_use_count = len(were_in_use)

print ("ENI and ports that were in use but not anymore (closed): {} results found".format(were_in_use_count))

# Past used ENIs and ports
There is evidence in the VPC flow logs that those ENIs and Ports had active traffic. Those ENI's and ports are **not active** in current configuration

In [None]:
were_in_use

# Used ports
There is evidence in the VPC flow logs that those ENIs and Ports had active traffic. Those ENI's and ports are  **active** in current configuration

In [None]:
public_enis_flat_cidrs[public_enis_flat_cidrs['In Use']==True][['NetworkInterfaceId','Port','IpRanges','Ipv6Ranges','In Use']]