''' AWS discovery plugin '''
import time
import json
import requests
from import BaseDiscover
import core.logs
import boto3
class Discover(BaseDiscover):
''' Main Discover Class '''
def start(self):
''' Start Discovery '''
logs = core.logs.Logger(config=self.config, proc_name="")
logger = logs.getLogger()
logger = logs.clean_handlers(logger)"Getting hosts from AWS")
while True:
# Setup IP List
ip_addrs = []
# Connect to AWS
session = boto3.session.Session(
# Get Regions then connect to each and list instances
for region in session.get_available_regions('ec2'):
ec2 = session.client("ec2", region)
data = ec2.describe_instances()
for reservation in data['Reservations']:
for instance in reservation['Instances']:
# Check if filter should be public or private IP's
if 'filter' in self.config['discovery']['plugins']['aws']:
ip_types = self.config['discovery']['plugins']['aws']['filter']
else: # Default to both
ip_types = [ 'PrivateIPAddress', 'PublicIPAddress' ]
# Get IP's and Append to list
for ip_type in ip_types:
except Exception as e:
logger.debug("Failed to query AWS: {0}".format(e.message))
# Process found IP's
for ip in ip_addrs:
if self.dbc.new_discovery(ip=ip):
logger.debug("Added host {0} to discovery queue".format(ip))
logger.debug("Failed to add host {0} to discovery queue".format(ip))
logger.debug("Found {0} hosts".format(len(ip_addrs)))
if "unit_testing" in self.config.keys():
# Break out of loop for unit testing
# Return true for unit testing
return True