In [12]:
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import splunklib.client as client


In [10]:
import os
import boto3
import uuid
import json
import pymongo
import logging
from datetime import datetime
from pymongo.mongo_client import MongoClient

# function    : STORE-SEARCH-VARIANT
# description : Store search result variant to the database and enqueue JSON file task
# trigger     : SNS search-results

# Logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.info("Initializing function...")

# AWS Services
ddb_resource = None
sqs_resource = None


def lambda_handler(event, context):
    # Retrieve the environment variables
    db_conn_string = os.environ["mongodb_conn_string"]
    db_name = os.environ["mongodb_db_name"]
    sqs_json_file_tasks = os.environ["sqs_json_file_tasks"]

    # MongoDB
    db = connect_db(db_conn_string, db_name)

    # SQS
    global sqs_resource
    if not sqs_resource:
        sqs_resource = boto3.resource("sqs")
    queue = sqs_resource.get_queue_by_name(QueueName=sqs_json_file_tasks)

    # Extract data from the SNS notification
    msg = json.loads(event["Records"][0]["Sns"]["Message"])
    platform = msg["platform"].lower()
    query_id = msg["query_id"]
    query_name = msg["query_name"]  # or c2 name
    data = msg["data"]

    # Extract some data
    ip_address, asn, org = extract_data(platform, data)

    logger.info(data)
    logger.info(f"Extracted data: {ip_address} | {asn} | {org}")

    # Store variant in the database
    is_stored = insert_variant(db, ip_address, asn, org, query_id, query_name)

    if is_stored:
        logger.info(f"Variant stored: {ip_address} | {query_name}")
        enqueue_json_file_task(queue, platform, query_id, query_name, ip_address, data)
    else:
        logger.info(f"Variant existed: {ip_address} | {query_name}")
        increment_variant(db, ip_address, query_id)

    response = {
        "statusCode": 200,
        "message": "VARIANT_STORED" if is_stored else "VARIANT_EXISTED",
        "code": None,
    }

    logger.info(response)

    return response


def extract_data(platform, data):
    ip_address = None
    asn = None
    org = None

    if platform == "shodan":
        # Platform: Shodan
        if "ip_str" in data:
            ip_address = data["ip_str"]

        if "asn" in data:
            asn = data["asn"]

        if "org" in data:
            org = data["org"]

    elif platform == "censys":
        # Platform: Censys
        if "ip" in data:
            ip_address = data["ip"]

        if "autonomous_system" in data:
            if "asn" in data["autonomous_system"]:
                asn = data["autonomous_system"]["asn"]
            if "name" in data["autonomous_system"]:
                org = data["autonomous_system"]["name"]

    elif platform == "zoomeye":
        # Platform: ZoomEye
        if "ip" in data:
            ip_address = data["ip"]

        if "geoinfo" in data:
            if "asn" in data["geoinfo"]:
                asn = data["geoinfo"]["asn"]
            if "organization" in data["geoinfo"]:
                org = data["geoinfo"]["organization"]

    return ip_address, asn, org


def connect_db(conn_string, db_name):
    logger.info(f"Connecting to MongoDB...")

    client = MongoClient(conn_string, uuidRepresentation="standard")
    db = client[db_name]

    return db


def insert_variant(db, ip_address, asn, org, query_id, query_name):
    dt = datetime.utcnow()
    dt_part = dt.strftime("%Y-%m-%d")

    record = {
        "_id": uuid.uuid4(),
        "dt_created": dt_part,
        "ip_address": ip_address,
        "query_id": query_id,
        "c2_name": query_name,
        "asn": asn,
        "org": org,
        "counter": 1,  # initial value
        "ts_created": dt,
    }
    try:
        db.is_search_variants.insert_one(record)
    except pymongo.errors.DuplicateKeyError:
        return False
    else:
        return True


def get_variant(db, ip_address, query_id):
    dt = datetime.utcnow()
    dt_part = dt.strftime("%Y-%m-%d")

    record = db.is_search_variants.find_one(
        {
            "dt_created": dt_part,
            "ip_address": ip_address,
            "query_id": query_id,
        },
        {
            "_id": 1,
        },
    )
    return record


def increment_variant(db, ip_address, query_id):
    dt = datetime.utcnow()
    dt_part = dt.strftime("%Y-%m-%d")

    data = {
        "ts_updated": dt,
    }

    inc = {"counter": 1}

    db.is_search_variants.update_one(
        {
            "dt_created": dt_part,
            "ip_address": ip_address,
            "query_id": query_id,
        },
        {
            "$set": data,
            "$inc": inc,
        },
    )

In [None]:
def create_index(db):
    """This function creates an index for the collection"""
    index_name = "index_dt_ip_query"
    db.is_search_variants.create_index(
        [
            ("dt_created", pymongo.ASCENDING),
            ("ip_address", pymongo.ASCENDING),
            ("query_id", pymongo.ASCENDING),
        ],
        unique=True,
        name=index_name,
    )

In [None]:
def enqueue_json_file_task(queue, platform, query_id, query_name, ip_address, data):
    logger.info(f"Enqueueing JSON file task: {platform} | {ip_address}#{query_name}")
    entry = {
        "type": "SEARCH",
        "platform": platform,
        "query_id": query_id,
        "query_name": query_name,
        "data": data,
    }
    queue.send_message(MessageBody=json.dumps(entry))