In [None]:
import os
import json
import boto3
from datetime import datetime, timedelta, timezone
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import random

In [None]:
# Environment Variables (set in Lambda console or Terraform)
region = os.getenv("REGION", "us-gov-west-1")
host = os.getenv("OPENSEARCH_DOMAIN_ENDPOINT")  
index = "failed-login-alerts"


In [3]:
# AWS Auth
session = boto3.Session()
credentials = session.get_credentials()
awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    region,
    "es",
    session_token=credentials.token
)

In [4]:
# OpenSearch client
es = OpenSearch(
    hosts=[{'host': host.replace("https://", ""), 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

In [None]:
def lambda_handler(event, context):
    now = datetime.now(timezone.utc)
    alerts =[]

    for i in range(10):
        user_id = f"synthetic-user-{i+1}"
        alert_time = now() - timedelta(minutes=i*3)
        failure_rate = round(random.uniform(0.75,0.95), 2)
        failure_count = random.randint(7,12)
        success_count = random.randint(0, 3)

        alert_doc = {
            "alert_id" = f"alert-{alert_time}-00{i+1}",
            "timestamp" = alert_time.isoformat() +"Z",
            "user_id" = user_id,
            "source_ip" = f"10.0.0.{100+i}",
            "failure_rate" = failure_rate,
            "parameters" : {
                "failure_threshold": 0.7,
                "lookback_minutes": 60,
                "lookahead_minutes": 60
            },

            "entity" : {
                "type": "user",
                "id": user_id, 
            },

            "metadata" : {
                "window_start" : (alert_time - timedelta(hours=1)).isoformat() + "Z",
                "window_end": (alert_time + timedelta(hours=1)).isoformat() + "Z",
                "success_count": success_count,
                "failure_count": failure_count,
            },

            "correlated_even": {
                "event_id": f"event-{i}-1",
                "evebt_type": "failed_login",
                "timestamp": alert_time.isoformat() + "Z",
                "details": "multiple failed login attempts detected",
            },
            {
                "event_id": f"event-{i}-1",
                "evebt_type": "failed_login",
                "timestamp": alert_time.isoformat() + "Z",
                "details": "login failed with known password patterns",

            }         
        }

        res = es.index(index = index, body=alert_doc)
        alerts.append({
            "alert_id": alert_doc["alert_id"],
            "result": res.get("result"),
        })

        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": "Synthetic alerts generated successfully",
                "alerts": alerts
            })
        }