# 1. Hello-world lambda function

In [3]:
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    
    # Get the length and width parameters from the event object. The 
    # runtime converts the event object to a Python dictionary
    length = event['length']
    width = event['width']
    
    area = calculate_area(length, width)
    print(f"The area is {area}")
        
    logger.info(f"CloudWatch logs group: {context.log_group_name}")
    
    # return the calculated area as a JSON string
    data = {"area": area}
    return json.dumps(data)
    
def calculate_area(length, width):
    return length*width

# 2. Crypto price notifier

In [5]:
import json
import urllib.request

import boto3

SNS_TOPIC_ARN = 'arn:aws:sns:REGION:ACCOUNT_ID:crypto-alerts'  # replace

THRESHOLD = 3000  # ETH price USD
CURRENCY = "usd"
COIN = "ethereum"

def lambda_handler(event, context):
    url = f"https://api.coingecko.com/api/v3/simple/price?ids={COIN}&vs_currencies={CURRENCY}"
    try:
        with urllib.request.urlopen(url) as response:
            data = json.loads(response.read())
            price = data[COIN][CURRENCY]
            print(f"Current {COIN.upper()} price: ${price}")

            if price > THRESHOLD:
                send_alert(price)

            return {
                'statusCode': 200,
                'body': f'ETH Price: ${price}'
            }

    except Exception as e:
        print("Error:", str(e))
        return {
            'statusCode': 500,
            'body': json.dumps('Error fetching price')
        }

def send_alert(price):
    sns = boto3.client('sns')
    message = f"ETH price is high: ${price}!"
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject="ETH Price Alert",
        Message=message
    )


# 3. DEX Arbitrage monitoring Lambda

This AWS Lambda function monitors the price difference between two decentralized exchanges (represented here by Coinbase and Binance as stand-ins for Uniswap and Sushiswap).  
It uses a **Z-score statistical model** to detect arbitrage opportunities and logs them to multiple destinations for further processing.

The project demonstrates:
- Event-driven Lambda design
- AWS service integration
- Efficient DB connection reuse in a serverless environment
- Real-time data ingestion, statistical processing, and alerting

---

## AWS Services Involved

| Service | Purpose |
| ------- | ------- |
| **AWS Lambda** | Core processing logic: fetching prices, computing Z-scores, orchestrating actions |
| **Amazon SSM Parameter Store** | Stores the Z-score threshold `/arbitrage/threshold`, supports live updates without redeployment |
| **Amazon DynamoDB** | Keeps the rolling history of recent price differences for statistical analysis |
| **Amazon RDS (PostgreSQL)** | Persistent storage of detected arbitrage opportunities |
| **Amazon S3** | Stores opportunity snapshots in JSON for audit & replay |
| **Amazon SQS** | Sends opportunity messages to downstream consumers |
| **Amazon CloudWatch** | Logs application output and publishes custom metrics |
| **External APIs** | Fetches ETH/USD prices from Coinbase and Binance (proxy for DEX reads) |

---

## Architecture & Flow

Below is a logical representation of the pipeline:This AWS Lambda function monitors the price difference between two decentralized exchanges (represented here by Coinbase and Binance as stand-ins for Uniswap and Sushiswap).  
It uses a **Z-score statistical model** to detect arbitrage opportunities and logs them to multiple destinations for further processing.

The project demonstrates:
- Event-driven Lambda design
- AWS service integration
- Efficient DB connection reuse in a serverless environment
- Real-time data ingestion, statistical processing, and alerting

---

## AWS Services Involved

| Service | Purpose |
| ------- | ------- |
| **AWS Lambda** | Core processing logic: fetching prices, computing Z-scores, orchestrating actions |
| **Amazon SSM Parameter Store** | Stores the Z-score threshold `/arbitrage/threshold`, supports live updates without redeployment |
| **Amazon DynamoDB** | Keeps the rolling history of recent price differences for statistical analysis |
| **Amazon RDS (PostgreSQL)** | Persistent storage of detected arbitrage opportunities |
| **Amazon S3** | Stores opportunity snapshots in JSON for audit & replay |
| **Amazon SQS** | Sends opportunity messages to downstream consumers |
| **Amazon CloudWatch** | Logs application output and publishes custom metrics |
| **External APIs** | Fetches ETH/USD prices from Coinbase and Binance (proxy for DEX reads) |

---

## Architecture & Flow

Below is a logical representation of the pipeline:

    ┌───────────────────────────────┐
    │   Event Sources               │
    │                               │
    │  1. SSM Parameter Change Event│─── updates cached threshold
    │  2. API Gateway POST /tick    │─── prices provided in request body
    │  3. EventBridge Scheduled Run │─── fetches prices from APIs
    └───────────────────────────────┘
                  │
                  ▼
         ┌───────────────────┐
         │  Lambda Handler   │
         └───────────────────┘
                  │
                  ▼
         ┌───────────────────┐
         │  fetch_prices()   │───> Coinbase / Binance APIs
         └───────────────────┘
                  │
                  ▼
         ┌─────────────────────────────┐
         │  process_prices()           │
         │  - Compute % diff           │
         │  - Update DynamoDB history  │
         │  - Compute Z-score          │
         └─────────────────────────────┘
                  │
     ┌────────────┴─────────────┐
     │ Triggered? (Z ≥ threshold)│
     └────────────┬─────────────┘
                  │ Yes
                  ▼
    ┌───────────────────────────────────────────┐
    │ Actions                                   │
    │                                           │
    │  1. Store JSON in S3                      │
    │  2. Insert row in RDS (PostgreSQL)        │
    │  3. Send message to SQS                   │
    │  4. Publish metric to CloudWatch          │
    └───────────────────────────────────────────┘
    ┌───────────────────────────────┐
    │   Event Sources               │
    │                               │
    │  1. SSM Parameter Change Event│─── updates cached threshold
    │  2. API Gateway POST /tick    │─── prices provided in request body
    │  3. EventBridge Scheduled Run │─── fetches prices from APIs
    └───────────────────────────────┘
                  │
                  ▼
         ┌───────────────────┐
         │  Lambda Handler   │
         └───────────────────┘
                  │
                  ▼
         ┌───────────────────┐
         │  fetch_prices()   │───> Coinbase / Binance APIs
         └───────────────────┘
                  │
                  ▼
         ┌─────────────────────────────┐
         │  process_prices()           │
         │  - Compute % diff           │
         │  - Update DynamoDB history  │
         │  - Compute Z-score          │
         └─────────────────────────────┘
                  │
     ┌────────────┴─────────────┐
     │ Triggered? (Z ≥ threshold)│
     └────────────┬─────────────┘
                  │ Yes
                  ▼
    ┌───────────────────────────────────────────┐
    │ Actions                                   │
    │                                           │
    │  1. Store JSON in S3                      │
    │  2. Insert row in RDS (PostgreSQL)        │
    │  3. Send message to SQS                   │
    │  4. Publish metric to CloudWatch          │
    └───────────────────────────────────────────┘



---

### Key Features

- **Z-score anomaly detection**:  
  Uses recent historical data to detect statistically significant deviations.
  
- **Cached parameter retrieval**:  
  SSM Parameter Store value is cached in Lambda's execution context to minimize cold-fetch latency.

- **DB connection reuse**:  
  PostgreSQL connection is kept alive across warm invocations to improve performance.

- **Multi-destination logging**:  
  Opportunities are logged to RDS, S3, SQS, and CloudWatch for durability, analytics, and alerting.

---

### Example Z-score Calculation

``` diff_pct = |uni_price - sushi_price| / avg_price
z_score = (diff_pct - mean(diff_history)) / stddev(diff_history)
if abs(z_score) >= threshold -> trigger actions
```

---

## Possible Extensions

- Replace Coinbase/Binance price sources with **on-chain Uniswap/Sushiswap contract calls** using Web3.
- Add **SNS** or **Slack webhook** for human alerts.
- Implement **auto-trading strategy** on detected opportunities.
- Store historical data in **Amazon Timestream** for time-series analytics.


In [None]:
import json
import boto3
import pg8000
import os
import statistics
from datetime import datetime
import time
from decimal import Decimal
from boto3.dynamodb.conditions import Key
import json, urllib.request
import logging

# Logging setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# AWS clients 
AWS_DEFAULT_REGION = 'eu-north-1'
boto3.setup_default_session(region_name=AWS_DEFAULT_REGION)

ssm = boto3.client('ssm')
s3 = boto3.client('s3')
sqs = boto3.client('sqs')
cloudwatch = boto3.client('cloudwatch')
dynamodb = boto3.resource('dynamodb')

# env variables set in Lambda
RDS_HOST = os.environ['RDS_HOST']
RDS_DB = os.environ['RDS_DB']
RDS_USER = os.environ['RDS_USER']
RDS_PASS = os.environ['RDS_PASS']
S3_BUCKET = os.environ['S3_BUCKET']
SQS_URL = os.environ['SQS_URL']

# Cached threshold values (avoid hitting SSM too often)
_cached_threshold = None
_cached_version = None
_cached_expires = 0
CACHE_TTL_SEC = 86400  # refresh once a day unless change event updates it

#other constants
MIN_HISTORY = int(os.getenv("MIN_HISTORY", "20"))

# Global RDS connection handle for reuse between Lambda invocations
_DB_CONN = None

def get_db_conn():
    global _DB_CONN
    if _DB_CONN:
        try:
            # Check if still alive
            cur = _DB_CONN.cursor()
            cur.execute("SELECT 1")
            cur.close()
            return _DB_CONN
        except Exception:
            # If it is dead, close and reset
            try:
                _DB_CONN.close()
            except:
                pass
            _DB_CONN = None
    
    # Create a new connection if not alive
    _DB_CONN = pg8000.connect(
        host=RDS_HOST,
        database=RDS_DB,
        user=RDS_USER,
        password=RDS_PASS,
        port=int(os.environ.get("RDS_PORT", "5432")), 
        timeout=int(os.environ.get("DB_TIMEOUT", "15"))
    )
    return _DB_CONN


def get_z_threshold():
    """
    Fetch the Z-score threshold from SSM Parameter Store.
    Uses cached value unless expired (default: refresh every 24h).
    """
    global _cached_threshold, _cached_version, _cached_expires
    now = time.time()
    if _cached_threshold is not None and now < _cached_expires:
        return _cached_threshold

    resp = ssm.get_parameter(Name='/arbitrage/threshold')
    val = float(resp['Parameter']['Value'])
    ver = resp['Parameter']['Version']

    _cached_threshold = val
    _cached_version = ver
    _cached_expires = now + CACHE_TTL_SEC
    return val

def _http_get_json(url, timeout=3):
    """
    Fetch JSON from a given URL with a custom User-Agent.
    """
    req = urllib.request.Request(url, headers={"User-Agent": "price-bot/1.0"})
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        return json.loads(resp.read().decode("utf-8"))


def fetch_prices():
    """
    Fetch ETH prices from Coinbase and Binance as stand-ins for Uniswap and Sushiswap.
    Returns a tuple: (uniswap_price, sushiswap_price).
    TODO: swap in on-chain reads later.
    """
    # Coinbase ETH-USD
    cb = _http_get_json("https://api.coinbase.com/v2/prices/ETH-USD/spot")
    coinbase_price = float(cb["data"]["amount"])

    # Binance ETHUSDT
    bz = _http_get_json("https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT")
    binance_price = float(bz["price"])

    # Log the prices to CloudWatch
    logger.info(f"Fetched prices — Coinbase: {coinbase_price}, Binance: {binance_price}")

    return coinbase_price, binance_price


def process_prices(uniswap_price, sushiswap_price, z_threshold):
    """
    Main logic:
    - Calculate % price difference between Uni and Sushi.
    - Store history in DynamoDB.
    - If Z-score threshold is met → log opportunity to S3, RDS, SQS, and CloudWatch.
    """
    logger.info(f"[Prices] uni={uniswap_price:.6f} sushi={sushiswap_price:.6f} threshold={z_threshold}")
    
    # % difference
    diff_pct = abs(uniswap_price - sushiswap_price) / ((uniswap_price + sushiswap_price) / 2)

    # DynamoDB table
    table = dynamodb.Table('PriceDiffHistory')

    # read last 100 diffs
    response = table.query(
        KeyConditionExpression=Key('pk').eq('diff_history'),
        ScanIndexForward=False,
        Limit=100
    )
    diffs = [float(item["diff"]) for item in response.get("Items", [])]

    # store current diff
    now = datetime.utcnow().isoformat()
    table.put_item(Item={
        "pk": "diff_history",
        "timestamp": now,
        "diff": Decimal(str(diff_pct))
    })

    # Only proceed if we have enough history
    if len(diffs) >= MIN_HISTORY:
        mean_diff = statistics.mean(diffs)
        std_diff = statistics.pstdev(diffs)
        
        if std_diff > 0:
            z_score = (diff_pct - mean_diff) / std_diff
            triggered = abs(z_score) >= z_threshold
            logger.info(f"[Z] diff={diff_pct:.6f} mean={mean_diff:.6f} std={std_diff:.6f} z={z_score:.3f} "
                f"trigger={triggered}")
            if triggered:
                timestamp = datetime.utcnow().isoformat()
                logger.info(f"[OPP] Logging opportunity at {timestamp}")

                opportunity = {
                    "timestamp": timestamp,
                    "uniswap": uniswap_price,
                    "sushiswap": sushiswap_price,
                    "diff": diff_pct
                }
                # Save to S3
                s3.put_object(
                    Bucket=S3_BUCKET,
                    Key=f"arbitrage/{timestamp}.json",
                    Body=json.dumps(opportunity)
                )

                # Insert to RDS
                conn = get_db_conn()
                cur = conn.cursor()
                cur.execute("""
                    INSERT INTO opportunities (timestamp, uniswap_price, sushiswap_price, z_score, threshold, diff_pct)
                    VALUES (%s, %s, %s, %s, %s, %s)
                """, (timestamp, uniswap_price, sushiswap_price, z_score, z_threshold, diff_pct))
                conn.commit()
                cur.close()

                # Send to SQS
                sqs.send_message(
                    QueueUrl=SQS_URL,
                    MessageBody=json.dumps(opportunity)
                )

                # Publish CloudWatch metric
                cloudwatch.put_metric_data(
                    Namespace='DEXArbitrage',
                    MetricData=[{
                        'MetricName': 'ArbitrageOpportunity',
                        'Value': diff_pct,
                        'Unit': 'Percent'
                    }]
                )
                return {"message": "Opportunity logged", "data": opportunity}
    else:
        logger.info(f"[Z] Not enough history yet ({len(diffs)}) or std=0; skipping")
    return {"message": "No opportunity"}



def lambda_handler(event, context):
    """
    Lambda entry point:
    - Handles SSM change events (updates threshold cache).
    - Handles API Gateway /tick calls (body with prices).
    - Handles scheduled calls (fetch prices directly).
    """

    global _cached_threshold, _cached_expires  # needed when you mutate them!

    # SSM parameter change event (EventBridge) -> update cache 
    if event.get('source') == 'aws.ssm':
        detail = event.get('detail', {})
        if detail.get('name') == '/arbitrage/threshold' and 'value' in detail:
            _cached_threshold = float(detail['value'])
            _cached_expires = time.time() + CACHE_TTL_SEC
            logger.info(f"Threshold cache updated to {_cached_threshold}")
            return {'statusCode': 200, 'body': json.dumps({'message': 'Threshold cache updated'})}

    # SSM parameter change → update cache
    z_threshold = get_z_threshold()

    # Handle different invocation sources
    body = None
    if isinstance(event, dict):

        # API Gateway JSON body
        if 'body' in event and event['body']:
            try:
                body = json.loads(event['body'])
            except Exception:
                pass
        
        # Direct Lambda invoke with prices
        if not body and 'uniswap' in event and 'sushiswap' in event:
            body = {'uniswap': event['uniswap'], 'sushiswap': event['sushiswap']}

    if body and 'uniswap' in body and 'sushiswap' in body:
        uniswap_price = float(body['uniswap'])
        sushiswap_price = float(body['sushiswap'])
    else:
        # Scheduled run (EventBridge) -> fetch our own prices
        uniswap_price, sushiswap_price = fetch_prices()
        
    # Process and return result
    result = process_prices(uniswap_price, sushiswap_price, z_threshold)
    return {'statusCode': 200, 'body': json.dumps(result)}

NoRegionError: You must specify a region.