# AWS Services Connectivity Test

Tests connectivity to all required AWS services from your local machine.

**Services tested:**
- AWS Credentials (IAM)
- S3 (direct)
- Bedrock (direct)
- SSH tunnel to jumphost
- OpenSearch (via SSH tunnel)
- RDS (via SSH tunnel, optional)

**Prerequisites:**
1. `.env` file in `operations/` with credentials
2. SSH key at `~/.ssh/id_rsa` for jumphost access
3. Python packages installed (see next cell)

In [1]:
# Install required packages
!uv pip install boto3 opensearch-py pymysql sshtunnel paramiko python-dotenv requests-aws4auth -q

In [2]:
import os
import sys
import boto3
import json
from pathlib import Path
from dotenv import load_dotenv

# Load .env from operations folder
env_path = Path('../operations/.env')
if env_path.exists():
    load_dotenv(env_path)
    print("✓ Loaded .env file")
else:
    print("❌ .env file not found at ../operations/.env")
    print("   Create it: cp ../operations/.env.example ../operations/.env")

# Configuration - UPDATE THESE
AWS_REGION = os.getenv('AWS_DEFAULT_REGION', 'us-east-1')
S3_BUCKET = os.getenv('S3_BUCKET_NAME', 'your-bucket-name')
OPENSEARCH_ENDPOINT = 'https://vpc-hackathon-autobots-apse1-fbn25iam65pez2wksf4jhojbne.ap-southeast-1.es.amazonaws.com'  # UPDATE THIS
OPENSEARCH_USERNAME = os.getenv('OPENSEARCH_USERNAME')
OPENSEARCH_PASSWORD = os.getenv('OPENSEARCH_PASSWORD')

# SSH Tunnel Config
JUMPHOST = os.getenv('JUMPHOST', 'jumphost-sg.castlery.com')
JUMPHOST_USER = os.getenv('JUMPHOST_USER', 'ec2-user')
SSH_KEY_PATH = os.path.expanduser(os.getenv('SSH_KEY_PATH', '~/.ssh/id_rsa'))

print(f"\nConfiguration:")
print(f"  AWS Region: {AWS_REGION}")
print(f"  S3 Bucket: {S3_BUCKET}")
print(f"  OpenSearch: {OPENSEARCH_ENDPOINT}")
print(f"  Jumphost: {JUMPHOST}")
print(f"  SSH Key: {SSH_KEY_PATH}")

✓ Loaded .env file

Configuration:
  AWS Region: ap-southeast-1
  S3 Bucket: cslr-hackathon-sg-test
  OpenSearch: https://vpc-hackathon-autobots-apse1-fbn25iam65pez2wksf4jhojbne.ap-southeast-1.es.amazonaws.com
  Jumphost: jumphost-sg.castlery.com
  SSH Key: /Users/pillalamarrimallikarjun/OneDrive - Castlery Pte Ltd/workspace/Fun projects/autobots-semantic-search/autobots-shared-ssh-key


In [4]:
print("Testing AWS Credentials...")
print("=" * 60)

try:
    sts = boto3.client('sts', region_name=AWS_REGION)
    identity = sts.get_caller_identity()
    
    print("✓ AWS Credentials Valid")
    print(f"  Account: {identity['Account']}")
    print(f"  User/Role: {identity['Arn']}")
    
except Exception as e:
    print(f"❌ AWS Credentials Failed: {str(e)}")
    print("   Configure: aws configure")

Testing AWS Credentials...
❌ AWS Credentials Failed: An error occurred (SignatureDoesNotMatch) when calling the GetCallerIdentity operation: The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.
   Configure: aws configure


In [5]:
print("Testing S3 Access...")
print("=" * 60)

try:
    s3 = boto3.client('s3', region_name=AWS_REGION)
    response = s3.list_buckets()
    
    print(f"✓ S3 Access Successful")
    print(f"  Total buckets: {len(response['Buckets'])}")
    
    bucket_names = [b['Name'] for b in response['Buckets']]
    if S3_BUCKET in bucket_names:
        print(f"  ✓ Bucket '{S3_BUCKET}' found")
        
        objects = s3.list_objects_v2(Bucket=S3_BUCKET, MaxKeys=5)
        if 'Contents' in objects:
            print(f"  ✓ Can list objects")
            print(f"    Sample files:")
            for obj in objects['Contents'][:5]:
                print(f"      - {obj['Key']}")
    else:
        print(f"  ❌ Bucket '{S3_BUCKET}' not found")
        
except Exception as e:
    print(f"❌ S3 Access Failed: {str(e)}")

Testing S3 Access...
❌ S3 Access Failed: An error occurred (SignatureDoesNotMatch) when calling the ListBuckets operation: The request signature we calculated does not match the signature you provided. Check your key and signing method.


In [6]:
print("Testing Bedrock Access...")
print("=" * 60)

try:
    bedrock = boto3.client('bedrock', region_name=AWS_REGION)
    response = bedrock.list_foundation_models()
    
    print(f"✓ Bedrock Access Successful")
    print(f"  Total models: {len(response['modelSummaries'])}")
    
    required_models = [
        'amazon.titan-embed-text-v1',
        'amazon.titan-embed-image-v1',
        'anthropic.claude-3-sonnet'
    ]
    
    for model_id in required_models:
        found = any(model_id in m['modelId'] for m in response['modelSummaries'])
        status = "✓" if found else "❌"
        print(f"  {status} {model_id}")
        
except Exception as e:
    print(f"❌ Bedrock Access Failed: {str(e)}")
    print("   Enable Bedrock in AWS Console")

Testing Bedrock Access...
❌ Bedrock Access Failed: An error occurred (InvalidSignatureException) when calling the ListFoundationModels operation: The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.
   Enable Bedrock in AWS Console


In [7]:
print("Testing Bedrock Runtime (Generate Embedding)...")
print("=" * 60)

try:
    bedrock_runtime = boto3.client('bedrock-runtime', region_name=AWS_REGION)
    
    test_text = "grey sofa under $1000"
    body = json.dumps({"inputText": test_text})
    
    response = bedrock_runtime.invoke_model(
        modelId='amazon.titan-embed-text-v1',
        body=body,
        contentType='application/json',
        accept='application/json'
    )
    
    result = json.loads(response['body'].read())
    embedding = result['embedding']
    
    print(f"✓ Bedrock Runtime Successful")
    print(f"  Test text: '{test_text}'")
    print(f"  Embedding dimension: {len(embedding)}")
    print(f"  Sample values: {embedding[:5]}...")
    
except Exception as e:
    print(f"❌ Bedrock Runtime Failed: {str(e)}")

Testing Bedrock Runtime (Generate Embedding)...
❌ Bedrock Runtime Failed: An error occurred (InvalidSignatureException) when calling the InvokeModel operation: The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.

The Canonical String for this request should have been
'POST
/model/amazon.titan-embed-text-v1/invoke

accept:application/json
content-type:application/json
host:bedrock-runtime.ap-southeast-1.amazonaws.com
x-amz-date:20260128T044059Z

accept;content-type;host;x-amz-date
ebe8d5cbd226054cdca1b16f66ee7d06cdf8e305a538ef2c76ec3aedefd07f04'

The String-to-Sign should have been
'AWS4-HMAC-SHA256
20260128T044059Z
20260128/ap-southeast-1/bedrock/aws4_request
6c642557da826230e3d7786fead56c4ce6654a4f8712f994c84d453d52e21076'



In [8]:
print("Testing SSH Connection to Jumphost...")
print("=" * 60)

import paramiko

try:
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    print(f"Connecting to {JUMPHOST_USER}@{JUMPHOST}...")
    ssh.connect(
        hostname=JUMPHOST,
        username=JUMPHOST_USER,
        key_filename=SSH_KEY_PATH,
        timeout=10
    )
    
    print("✓ SSH Connection Successful")
    
    stdin, stdout, stderr = ssh.exec_command('hostname')
    hostname = stdout.read().decode().strip()
    print(f"  Jumphost hostname: {hostname}")
    
    ssh.close()
    
except FileNotFoundError:
    print(f"❌ SSH Key not found: {SSH_KEY_PATH}")
except paramiko.AuthenticationException:
    print(f"❌ SSH Authentication Failed")
    print(f"   Check: chmod 600 {SSH_KEY_PATH}")
except Exception as e:
    print(f"❌ SSH Connection Failed: {str(e)}")

Testing SSH Connection to Jumphost...
Connecting to autobots@jumphost-sg.castlery.com...
✓ SSH Connection Successful
  Jumphost hostname: ip-172-31-3-40


In [None]:
print("Testing OpenSearch via SSH Tunnel...")
print("=" * 60)

from sshtunnel import SSHTunnelForwarder
from opensearchpy import OpenSearch
import time

tunnel = None
try:
    opensearch_host = OPENSEARCH_ENDPOINT.replace('https://', '').replace('http://', '')
    opensearch_port = 443
    
    print(f"Creating SSH tunnel to {opensearch_host}:{opensearch_port}...")
    
    tunnel = SSHTunnelForwarder(
        (JUMPHOST, 22),
        ssh_username=JUMPHOST_USER,
        ssh_pkey=SSH_KEY_PATH,
        remote_bind_address=(opensearch_host, opensearch_port),
        local_bind_address=('127.0.0.1', 9200)
    )
    
    tunnel.start()
    print(f"✓ SSH Tunnel established (local port: {tunnel.local_bind_port})")
    
    time.sleep(2)
    
    client = OpenSearch(
        hosts=[{'host': '127.0.0.1', 'port': tunnel.local_bind_port}],
        http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD),
        use_ssl=True,
        verify_certs=False,
        ssl_show_warn=False
    )
    
    info = client.info()
    print(f"✓ OpenSearch Connection Successful")
    print(f"  Cluster: {info['cluster_name']}")
    print(f"  Version: {info['version']['number']}")
    
    indices = client.cat.indices(format='json')
    print(f"  Total indices: {len(indices)}")
    if indices:
        print(f"  Sample indices:")
        for idx in indices[:5]:
            print(f"    - {idx['index']} ({idx['docs.count']} docs)")
    
except Exception as e:
    print(f"❌ OpenSearch Connection Failed: {str(e)}")
finally:
    if tunnel:
        tunnel.stop()
        print("\n✓ SSH Tunnel closed")

In [None]:
print("=" * 60)
print("CONNECTIVITY TEST SUMMARY")
print("=" * 60)
print()
print("Review the output above:")
print("  ✓ = Success")
print("  ❌ = Failed")
print()
print("If all tests pass, you're ready to run the pipeline!")
print()
print("Next steps:")
print("  cd ../operations")
print("  python pipeline.py")