In [9]:
import boto3
import json
import time

# LocalStack endpoint (accessible from host)
ENDPOINT_URL = "http://localhost:4566"
REGION = "us-east-1"

# Resource Names
TASK_QUEUE_NAME = "task-queue"
NOTIFY_QUEUE_NAME = "notification-queue"
BUCKET_NAME = "results-bucket"

# Clients
sqs = boto3.client("sqs", endpoint_url=ENDPOINT_URL, region_name=REGION)
s3 = boto3.client("s3", endpoint_url=ENDPOINT_URL, region_name=REGION)
lambdas = boto3.client("lambda", endpoint_url=ENDPOINT_URL, region_name=REGION)
iam = boto3.client("iam", endpoint_url=ENDPOINT_URL, region_name=REGION)

In [10]:
print("Creating Infrastructure...")

# 1. Create S3 Bucket
try:
    s3.create_bucket(Bucket=BUCKET_NAME)
    print(f"‚úÖ S3 Bucket '{BUCKET_NAME}' created.")
except Exception as e:
    print(f"‚ÑπÔ∏è Bucket might already exist: {e}")

# 2. Create SQS Queues
try:
    task_queue = sqs.create_queue(QueueName=TASK_QUEUE_NAME)
    notify_queue = sqs.create_queue(QueueName=NOTIFY_QUEUE_NAME)
    
    TASK_QUEUE_URL = task_queue['QueueUrl']
    NOTIFY_QUEUE_URL = notify_queue['QueueUrl']
    
    print(f"‚úÖ Task Queue URL: {TASK_QUEUE_URL}")
    print(f"‚úÖ Notify Queue URL: {NOTIFY_QUEUE_URL}")
except Exception as e:
    # Fetch if they already exist
    TASK_QUEUE_URL = sqs.get_queue_url(QueueName=TASK_QUEUE_NAME)['QueueUrl']
    NOTIFY_QUEUE_URL = sqs.get_queue_url(QueueName=NOTIFY_QUEUE_NAME)['QueueUrl']
    print(f"‚úÖ Queues fetched: {TASK_QUEUE_URL}")

# 3. Create IAM Role
try:
    role = iam.create_role(
        RoleName="lambda-role",
        AssumeRolePolicyDocument=json.dumps({
            "Version": "2012-10-17",
            "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]
        })
    )
    ROLE_ARN = role['Role']['Arn']
except Exception:
    ROLE_ARN = iam.get_role(RoleName="lambda-role")['Role']['Arn']
print(f"‚úÖ IAM Role ready: {ROLE_ARN}")

Creating Infrastructure...
‚úÖ S3 Bucket 'results-bucket' created.
‚úÖ Task Queue URL: http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/task-queue
‚úÖ Notify Queue URL: http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/notification-queue
‚úÖ IAM Role ready: arn:aws:iam::000000000000:role/lambda-role


In [11]:
import zipfile
import io
import os

def create_lambda_zip():
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        # 1. Add app module (place 'app' folder at root of zip)
        for root, dirs, files in os.walk('../app'):
            for file in files:
                if file.endswith('.py'):
                    # This creates structure: app/processors.py
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, start='..')
                    zip_file.write(file_path, arcname)
        
        # 2. Add lambdas (place files at root of zip for handlers)
        for root, dirs, files in os.walk('../lambdas'):
            for file in files:
                if file.endswith('.py'):
                    # This creates structure: task_lambda.py
                    file_path = os.path.join(root, file)
                    zip_file.write(file_path, arcname=file)
                    
    return zip_buffer.getvalue()

zip_content = create_lambda_zip()

# Delete existing to ensure clean slate (avoids update errors)
for func in ["task_lambda", "notification_lambda"]:
    try: lambdas.delete_function(FunctionName=func)
    except: pass

# Common Config
LAMBDA_ENV = {
    'Variables': {
        'AWS_ENDPOINT_URL': 'http://localstack:4566',
        'AWS_REGION': REGION
    }
}

# Create Task Lambda
lambdas.create_function(
    FunctionName="task_lambda",
    Runtime="python3.10",
    Role=ROLE_ARN,
    Handler="task_lambda.lambda_handler",
    Code={'ZipFile': zip_content},
    Environment=LAMBDA_ENV,
    Timeout=30
)

# Create Notification Lambda
lambdas.create_function(
    FunctionName="notification_lambda",
    Runtime="python3.10",
    Role=ROLE_ARN,
    Handler="notification_lambda.lambda_handler",
    Code={'ZipFile': zip_content},
    Environment=LAMBDA_ENV,
    Timeout=30
)

print("üöÄ Lambdas Deployed Successfully!")

üöÄ Lambdas Deployed Successfully!


In [12]:
# Helper to map queue to function
def add_trigger(queue_url, function_name):
    try:
        queue_arn = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']
        lambdas.create_event_source_mapping(
            EventSourceArn=queue_arn,
            FunctionName=function_name,
            BatchSize=1
        )
        print(f"üîó Trigger Linked: {queue_url.split('/')[-1]} -> {function_name}")
    except lambdas.exceptions.ResourceConflictException:
        print(f"üîó Trigger already exists for {function_name}")

add_trigger(TASK_QUEUE_URL, "task_lambda")
add_trigger(NOTIFY_QUEUE_URL, "notification_lambda")

üîó Trigger already exists for task_lambda
üîó Trigger already exists for notification_lambda


In [13]:
import time
import json

task_id = f"invoice-{int(time.time())}"

# INPUT: A list of item prices
test_message = {
    "task_id": task_id,
    "numbers": [20,56]
}

sqs.send_message(
    QueueUrl=TASK_QUEUE_URL,
    MessageBody=json.dumps(test_message)
)

print(f"üì§ Message sent: {test_message}")
print("‚è≥ Waiting for processing (10s)...")
time.sleep(10) 

# Check S3
try:
    obj = s3.get_object(Bucket=BUCKET_NAME, Key=f"{task_id}.json")
    result = json.loads(obj['Body'].read())
    print(f"‚úÖ RESULT FOUND in S3: {result}")
except Exception as e:
    print(f"‚ùå Result not found yet: {e}")

üì§ Message sent: {'task_id': 'invoice-1769939685', 'numbers': [20, 56]}
‚è≥ Waiting for processing (10s)...
‚ùå Result not found yet: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.
