In [1]:
import boto3
import json
import logging
from botocore.exceptions import ClientError
import zipfile
import os
import time
from datetime import datetime # Ensure datetime is imported if not already at the top


In [2]:
# Instantiating clients
aws_lambda = boto3.client('lambda', region_name='us-east-1')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3', region_name='us-east-1')
sqs = boto3.client('sqs', region_name="us-east-1")
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')
role = iam_client.get_role(RoleName='LabRole')
role_arn = role['Role']['Arn']
sns_client = boto3.client("sns", region_name='us-east-1')
lambda_client = boto3.client('lambda', region_name= 'us-east-1')


In [3]:
# --- Create S3 Bucket (Incoming Data Bucket) ---
def create_bucket(bucket_name, region='us-east-1'):
    """Create an S3 bucket in a specified region."""
    try:
        if region == 'us-east-1':
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
        print(f'Bucket {bucket_name} created successfully.')
    except ClientError as e:
        if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
            print(f'Bucket {bucket_name} already exists and is owned by you.')
        elif e.response['Error']['Code'] == 'BucketAlreadyExists':
            print(f'Bucket {bucket_name} already exists (owned by another account or globally).')
        else:
            logging.error(e)
            return False
    return True

incoming_bucket_name = "final-summer99d"
create_bucket(incoming_bucket_name)


Bucket final-summer99d created successfully.


True

In [6]:
# --- Upload User Emails Configuration to S3 ---
import os # Ensure os is imported

user_emails_file_path = 'user_email.json' # Make sure this file exists in your project root
s3_config_key = 'config/user_email.json' # S3 path where the config file will be stored

if incoming_bucket_name and os.path.exists(user_emails_file_path):
    print(f"\n--- Uploading {user_emails_file_path} to S3 bucket '{incoming_bucket_name}' ---")
    try:
        s3_client.upload_file(
            Filename=user_emails_file_path,
            Bucket=incoming_bucket_name,
            Key=s3_config_key
        )
        print(f"Successfully uploaded {user_emails_file_path} to s3://{incoming_bucket_name}/{s3_config_key}.")
    except ClientError as e:
        print(f"Error uploading user emails config to S3: {str(e)}")
    except Exception as e:
        print(f"Unexpected error uploading user emails config to S3: {str(e)}")
else:
    print(f"Skipping user emails config upload: '{user_emails_file_path}' not found or S3 bucket not available.")



--- Uploading user_email.json to S3 bucket 'final-summer99d' ---
Successfully uploaded user_email.json to s3://final-summer99d/config/user_email.json.


In [7]:
# ----- Create a DynamoDB table ----
def create_dynamodb_table(table_name):
    """Create a DynamoDB table for survey responses."""
    try:
        dynamodb_client.create_table(
            TableName=table_name,
            KeySchema=[
                {'AttributeName': 'user_id', 'KeyType': 'HASH'},  # Partition key
                {'AttributeName': 'timestamp', 'KeyType': 'RANGE'}  # Sort key
            ],
            AttributeDefinitions=[
                {'AttributeName': 'user_id', 'AttributeType': 'S'},
                {'AttributeName': 'timestamp', 'AttributeType': 'S'}
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )
        dynamodb_client.get_waiter('table_exists').wait(TableName=table_name)
        print(f'DynamoDB table {table_name} created successfully.')
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceInUseException':
            print(f'DynamoDB table {table_name} already exists.')
        else:
            logging.error(e)
            return False
    return True

# Setting beta table name
table_name = 'CyclicalBetaSurvey'

# Call the function to create the table
create_dynamodb_table(table_name)


DynamoDB table CyclicalBetaSurvey already exists.


True

In [8]:
bucket_name= "final-summer99d"

In [9]:
# --- Helper for Zipping Lambda Code ---
def create_lambda_zip(file_name, zip_name):
    with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zf:
        zf.write(file_name, arcname=os.path.basename(file_name))
    print(f"{zip_name} created from {file_name}.")

# --- Helper for creating/updating Lambda Function ---
def deploy_lambda_function(function_name, handler, runtime, role_arn, zip_file_path, timeout=30):
    create_lambda_zip(handler.split('.')[0] + '.py', zip_file_path)
    with open(zip_file_path, 'rb') as f:
        lambda_zip_content = f.read()

    try:
        aws_lambda.create_function(
            FunctionName=function_name,
            Runtime=runtime,
            Role=role_arn,
            Handler=handler,
            Code=dict(ZipFile=lambda_zip_content),
            Timeout=timeout
        )
        print(f"{function_name} function created.")
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceConflictException':
            print(f"{function_name} already exists. Attempting to update code.")
            aws_lambda.update_function_code(
                FunctionName=function_name,
                ZipFile=lambda_zip_content
            )
            aws_lambda.update_function_configuration(
                FunctionName=function_name,
                Handler=handler, # Update handler in case it changed
                Runtime=runtime, # Update runtime in case it changed
                Role=role_arn,   # Update role in case it changed
                Timeout=timeout
            )
            print(f"{function_name} function code and configuration updated.")
        else:
            raise e


In [26]:
# --- Deploy Lambda Functions ---
if role_arn:
    # verify (initial validation) Lambda function
    # The name of the Lambda function in AWS will be 'verify'.
    # This deploys the 'verify.py' file.
    lambda_name_verify = 'verify'  # Correctly define the function name
    deploy_lambda_function(
    function_name=lambda_name_verify,  # Use the defined variable
    handler='verify.lambda_handler',   # Points to verify.py and lambda_handler
    runtime='python3.9',
    role_arn=role_arn,
    zip_file_path=f'{lambda_name_verify}.zip'  # This will look for 'verify.zip'
)

    # cat_lambda
    lambda_name_cat = 'cat_lambda'
    deploy_lambda_function(
        function_name=lambda_name_cat,
        handler='cat_lambda.lambda_handler',
        runtime='python3.9',
        role_arn=role_arn,
        zip_file_path=f'{lambda_name_cat}.zip'
    )

    # rec_lambda
    lambda_name_rec = 'rec_lambda'
    deploy_lambda_function(
        function_name=lambda_name_rec,
        handler='rec_lambda.lambda_handler',
        runtime='python3.9',
        role_arn=role_arn,
        zip_file_path=f'{lambda_name_rec}.zip'
    )

    lambda_name_send_email = 'send_email_lambda'  # Correctly define the function name
    deploy_lambda_function(
    function_name=lambda_name_send_email,  # Use the defined variable
    handler='send_email_lambda.lambda_handler',   # Points to verify.py and lambda_handler
    runtime='python3.9',
    role_arn=role_arn,
    zip_file_path=f'{lambda_name_send_email}.zip'  # This will look for 'verify.zip'
)
else:
    print("Skipping Lambda deployments due to missing IAM Role.")

verify.zip created from verify.py.
verify function created.
cat_lambda.zip created from cat_lambda.py.
cat_lambda function created.
rec_lambda.zip created from rec_lambda.py.
rec_lambda function created.
send_email_lambda.zip created from send_email_lambda.py.
send_email_lambda function created.


In [27]:
# Add permission for S3 to invoke the Lambda function
lambda_client.add_permission(
    FunctionName='verify',  # The name of your Lambda function
    StatementId='s3-invoke',  # A unique identifier for this permission
    Action='lambda:InvokeFunction',  # The action S3 is allowed to perform
    Principal='s3.amazonaws.com',  # The service allowed to invoke the function
    SourceArn=f'arn:aws:s3:::{bucket_name}'  # The ARN of your S3 bucket
)

{'ResponseMetadata': {'RequestId': 'd0d97c93-c2af-4126-a326-f7dd9464c5ba',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Fri, 30 May 2025 20:12:23 GMT',
   'content-type': 'application/json',
   'content-length': '298',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'd0d97c93-c2af-4126-a326-f7dd9464c5ba'},
  'RetryAttempts': 0},
 'Statement': '{"Sid":"s3-invoke","Effect":"Allow","Principal":{"Service":"s3.amazonaws.com"},"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:544835564974:function:verify","Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:s3:::final-summer99d"}}}'}

In [34]:
# Set up S3 event notification to trigger Lambda
try:
    s3_client.put_bucket_notification_configuration(
        Bucket=bucket_name,
        NotificationConfiguration={
            'LambdaFunctionConfigurations': [
                {
                    'LambdaFunctionArn': aws_lambda.get_function(FunctionName='verify')['Configuration']['FunctionArn'],
                    'Events': ['s3:ObjectCreated:*'],
                    'Filter': {
                        'Key': {
                            'FilterRules': [
                                {'Name': 'prefix', 'Value': 'raw_surveys/'}
                            ]
                        }
                    }
                }
            ]
        }
    )
    print(f"Set up S3 event notification for bucket {bucket_name} to trigger Lambda 'verify'.")
except ClientError as e:
    print(f"Error setting up S3 event notification: {str(e)}")

Set up S3 event notification for bucket final-summer99d to trigger Lambda 'verify'.


In [13]:
# Step 3: Create SQS Queue
queue_url = sqs.create_queue(QueueName='BetaSurveyQueuesummer99d')['QueueUrl']
print(f"SQS queue created: {queue_url}")

SQS queue created: https://sqs.us-east-1.amazonaws.com/544835564974/BetaSurveyQueuesummer99d


In [14]:
# Step 6: Get SQS Queue ARN
sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])
sqs_arn = sqs_info['Attributes']['QueueArn']
print(f"SQS ARN: {sqs_arn}")


SQS ARN: arn:aws:sqs:us-east-1:544835564974:BetaSurveyQueuesummer99d


In [29]:
queue_name_initial="BetaSurveyQueuesummer99d"

In [18]:
# Step 6: Connect SQS to Lambda
aws_lambda.create_event_source_mapping(
    EventSourceArn=sqs_arn,
    FunctionName='verify',
    Enabled=True,
    BatchSize=10
)
print("SQS queue linked to Lambda function.")

print(f"SQS Queue URL: {queue_url}")

SQS queue linked to Lambda function.
SQS Queue URL: https://sqs.us-east-1.amazonaws.com/544835564974/BetaSurveyQueuesummer99d


In [35]:
# --- Simulate Survey Data Ingestion (Upload to S3) ---

def upload_survey_to_s3(survey_path, bucket_name, prefix='raw_surveys/'):
    s3 = boto3.client('s3', region_name='us-east-1')
    
    # Read the JSON survey file
    with open(survey_path, 'r') as f:
        survey_data = json.load(f) # survey_data is now either a dict or a list
    
    # Ensure each test.json is a list as expected by verify.py
    if not isinstance(survey_data, list):
        # If a single JSON object, wrap it in a list for verify.py to process
        survey_data_to_upload = [survey_data] # Use a new variable name for clarity
    else:
        survey_data_to_upload = survey_data # If already a list, use it as is
    
    # Create a unique key for the S3 object
    # Example: raw_surveys/test1_TIMESTAMP.json
    timestamp_now = datetime.now().strftime("%Y%m%d%H%M%S%f") # More unique timestamp
    file_base_name = os.path.basename(survey_path).replace('.json', '')
    s3_key = f"{prefix}{file_base_name}_{timestamp_now}.json"
    
    # Upload the survey data (as a JSON string) to S3
    try:
        s3.put_object(
            Bucket=bucket_name,
            Key=s3_key,
            Body=json.dumps(survey_data_to_upload), # <-- CRITICAL: Use survey_data_to_upload here
            ContentType='application/json'
        )
        print(f"Uploaded {s3_key} to S3 bucket {bucket_name} with status: 200")
        return 200
    except ClientError as e:
        print(f"Error uploading {s3_key} to S3: {str(e)}")
        return 400
    except Exception as e:
        print(f"Unexpected error uploading {s3_key} to S3: {str(e)}")
        return 500

# List of survey files (assuming these are in a 'tests/' subdirectory)
survey_files = [
    'tests/test1.json',
    'tests/test2.json',
    'tests/test3.json',
    'tests/test4.json',
    'tests/test5.json',
    'tests/test6.json',
    'tests/test7.json'
]

# The bucket name is 'final-summer99d' as defined earlier in main1.ipynb
if bucket_name: # Use the variable 'incoming_bucket_name'
    print(f"\n--- Uploading survey files to S3 bucket '{bucket_name}' (this will trigger verify Lambda) --- ")
    for survey_file in survey_files:
        status = upload_survey_to_s3(survey_file, bucket_name)
        print(f"Upload of {survey_file} to S3 completed with status: {status}")
        time.sleep(2) # Give some time for Lambda to be triggered and process
else:
    print("Skipping survey uploads: Incoming S3 bucket name is not available.")


--- Uploading survey files to S3 bucket 'final-summer99d' (this will trigger verify Lambda) --- 
Uploaded raw_surveys/test1_20250530151800328749.json to S3 bucket final-summer99d with status: 200
Upload of tests/test1.json to S3 completed with status: 200
Uploaded raw_surveys/test2_20250530151802808462.json to S3 bucket final-summer99d with status: 200
Upload of tests/test2.json to S3 completed with status: 200
Uploaded raw_surveys/test3_20250530151805277418.json to S3 bucket final-summer99d with status: 200
Upload of tests/test3.json to S3 completed with status: 200
Uploaded raw_surveys/test4_20250530151807463619.json to S3 bucket final-summer99d with status: 200
Upload of tests/test4.json to S3 completed with status: 200
Uploaded raw_surveys/test5_20250530151809668412.json to S3 bucket final-summer99d with status: 200
Upload of tests/test5.json to S3 completed with status: 200
Uploaded raw_surveys/test6_20250530151811871184.json to S3 bucket final-summer99d with status: 200
Upload o

In [36]:
# Verify S3 bucket
print("S3 bucket contents:")
response = s3_client.list_objects_v2(Bucket='final-summer99d')
objects = response.get('Contents', [])
if objects:
    for obj in objects:
        print(obj['Key'])
else:
    print("No objects found in S3 bucket final-summer99d.")

S3 bucket contents:
config/user_email.json
raw_surveys/test1_20250530151234499348.json
raw_surveys/test1_20250530151708027171.json
raw_surveys/test1_20250530151800328749.json
raw_surveys/test2_20250530151236687784.json
raw_surveys/test2_20250530151710251814.json
raw_surveys/test2_20250530151802808462.json
raw_surveys/test3_20250530151238880982.json
raw_surveys/test3_20250530151712450763.json
raw_surveys/test3_20250530151805277418.json
raw_surveys/test4_20250530151241123513.json
raw_surveys/test4_20250530151714676573.json
raw_surveys/test4_20250530151807463619.json
raw_surveys/test5_20250530151243329801.json
raw_surveys/test5_20250530151716879977.json
raw_surveys/test5_20250530151809668412.json
raw_surveys/test6_20250530151245565438.json
raw_surveys/test6_20250530151719149441.json
raw_surveys/test6_20250530151811871184.json
raw_surveys/test7_20250530151247771739.json
raw_surveys/test7_20250530151721343748.json
raw_surveys/test7_20250530151814134858.json
recommendations/user_001_05272410

In [37]:
table_name='CyclicalBetaSurvey'
# --- Verify DynamoDB Records ---
print("\n--- Verifying DynamoDB records... ---")
table = dynamodb.Table(table_name)
try:
    dynamodb_records = table.scan()['Items']
    if dynamodb_records:
        print(f"Found {len(dynamodb_records)} records in DynamoDB table '{table_name}':")
        for record in dynamodb_records:
            print(record)
    else:
        print(f"No records found in DynamoDB table '{table_name}'.")
except ClientError as e:
    print(f"Error scanning DynamoDB table '{table_name}': {str(e)}")
    table_description = dynamodb_client.describe_table(TableName=table_name)
    print(f"Table status: {table_description['Table']['TableStatus']}")
    print(f"Table item count: {table_description['Table']['ItemCount']}")




--- Verifying DynamoDB records... ---
Found 5 records in DynamoDB table 'CyclicalBetaSurvey':
{'recommendations': ['Get plenty of rest and iron-rich foods like spinach and lentils.', 'Do gentle yoga or stretching instead of high-intensity workouts.', 'Stay hydrated and use heat pads for cramps.'], 'user_id': 'user_001', 'phase': 'Menstruation', 'responses': {'q1': Decimal('4'), 'q2': Decimal('0'), 'q3': Decimal('1'), 'q4': Decimal('1'), 'q5': {'symptoms': ['Cramps', 'Feeling hot or flushed', 'Digestive issues (constipation, diarrhea)'], 'additional': 'Felt dizzy after waking up.'}, 'q6': Decimal('1')}, 'timestamp': '052724100245', 'valid': True, 'time_elapsed': Decimal('10')}
{'recommendations': ['Get plenty of rest and iron-rich foods like spinach and lentils.', 'Do gentle yoga or stretching instead of high-intensity workouts.', 'Stay hydrated and use heat pads for cramps.'], 'user_id': 'user_005', 'phase': 'Menstruation', 'responses': {'q1': Decimal('3'), 'q2': Decimal('2'), 'q3': D

In [52]:
# Delete each pipeline component if it still exists:

# Lambdas
lambda_functions=['verify', 'cat_lambda','rec_lambda', "send_email_lambda"]
for function_name in lambda_functions:
    try:
        lambda_client.delete_function(FunctionName=function_name)
        print(f"Deleted Lambda function: {function_name}")
    except lambda_client.exceptions.ResourceNotFoundException:
        print(f"Lambda function {function_name} not found")
    except Exception as e:
        print(f"Error deleting Lambda function {function_name}: {e}")

# SQS
try:
    sqs.delete_queue(QueueUrl='https://sqs.us-east-1.amazonaws.com/544835564974/BetaSurveyQueuesummer99d')
    print("SQS Queue Deleted")
except sqs.exceptions.QueueDoesNotExist:
    print("SQS Queue Already Deleted")

# DynamoDB
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
try:
    response = dynamodb.delete_table(TableName='CyclicalBetaSurvey')
    print("DynamoDB Table Deleted")
except dynamodb.exceptions.ResourceNotFoundException:
    print("DynamoDB Table Already Deleted")

Lambda function verify not found
Lambda function cat_lambda not found
Lambda function rec_lambda not found
Deleted Lambda function: send_email_lambda
SQS Queue Already Deleted
DynamoDB Table Already Deleted
