# Genomics VEP Pipeline Deployment

This notebook deploys the complete end-to-end genomics VEP processing pipeline.

## Architecture Overview:
1. **VCF Upload** → S3 Input Bucket triggers Lambda
2. **VEP Workflow** → HealthOmics processes VCFs with VEP annotation
3. **EventBridge Monitoring** → Tracks workflow completion
4. **Variant Store Import** → Automatically imports VEP results
5. **Lake Formation Setup** → Creates database permissions for agent queries

## Prerequisites:
- CloudFormation stack deployed using `infrastructure.yaml`
- AWS CLI configured with appropriate permissions
- Files in `./lambda/` directory

## Deployment Steps:
1. Configure parameters
2. Create HealthOmics VEP workflow
3. Create variant and annotation stores
4. Deploy Lambda functions
5. Configure S3 triggers
6. Setup Lake Formation permissions
7. Test the pipeline

In [None]:
import boto3
import json
import time
import zipfile
import base64
import os
import uuid
from datetime import datetime
from pathlib import Path

## Configuration

**IMPORTANT**: Update these parameters to match your CloudFormation stack and requirements.

In [None]:
# AWS Configuration
AWS_PROFILE = '<YOUR_AWS_PROFILE>'  # UPDATE THIS
AWS_REGION = '<YOUR_REGION>'  # UPDATE THIS
CLOUDFORMATION_STACK_NAME = 'genomics-vep-stack'  # UPDATE THIS

# Set AWS profile
os.environ['AWS_PROFILE'] = AWS_PROFILE
session = boto3.Session(profile_name=AWS_PROFILE, region_name=AWS_REGION)

# Initialize clients
cf_client = session.client('cloudformation')
omics_client = session.client('omics')
lambda_client = session.client('lambda')
s3_client = session.client('s3')
iam_client = session.client('iam')
lakeformation = session.client('lakeformation')
glue = session.client('glue')
events_client = session.client('events')
sts_client = session.client('sts')

# Get account info
account_id = sts_client.get_caller_identity()['Account']
print(f"Account ID: {account_id}")
print(f"Region: {AWS_REGION}")
print(f"Profile: {AWS_PROFILE}")

## Get CloudFormation Stack Outputs

In [None]:
# Get CloudFormation stack outputs
def get_stack_outputs(stack_name):
    try:
        response = cf_client.describe_stacks(StackName=stack_name)
        outputs = {}
        for output in response['Stacks'][0].get('Outputs', []):
            outputs[output['OutputKey']] = output['OutputValue']
        return outputs
    except Exception as e:
        print(f"Error getting stack outputs: {e}")
        return {}

stack_outputs = get_stack_outputs(CLOUDFORMATION_STACK_NAME)
print("CloudFormation Stack Outputs:")
for key, value in stack_outputs.items():
    print(f"  {key}: {value}")

# Extract key values
VCF_INPUT_BUCKET = stack_outputs.get('VcfInputBucketName')
VEP_OUTPUT_BUCKET = stack_outputs.get('VepOutputBucketName')
DYNAMODB_TABLE = stack_outputs.get('DynamoDBTableName')
WORKFLOW_ROLE_ARN = stack_outputs.get('HealthOmicsWorkflowRoleArn')
AGENT_ROLE_ARN = stack_outputs.get('AgentRoleArn')
DATABASE_NAME = stack_outputs.get('DatabaseName')
VCF_PROCESSOR_FUNCTION = stack_outputs.get('VcfProcessorFunctionName')
WORKFLOW_MONITOR_FUNCTION = stack_outputs.get('WorkflowMonitorFunctionName')

# Part 2: HealthOmics Workflow Creation

In [None]:
# Configuration for HealthOmics resources
WORKFLOW_NAME = 'vep-workflow2'
VARIANT_STORE_NAME = 'genomicsvariantstore'
ANNOTATION_STORE_NAME = 'genomicsannotationstore'
REFERENCE_STORE_ID = '<YOUR_REFERENCE_STORE_ID>'  # UPDATE your reference store ID
REFERENCE_GENOME_ID = '8931115867' # UPDATE your reference genome ID
BATCH_SIZE = 20

print(f"Workflow Name: {WORKFLOW_NAME}")
print(f"Variant Store: {VARIANT_STORE_NAME}")
print(f"Annotation Store: {ANNOTATION_STORE_NAME}")
print(f"Reference Store ID: {REFERENCE_STORE_ID}")

## Create HealthOmics VEP Workflow

In [None]:
def create_vep_workflow():
    """Create VEP workflow from the workflow.zip file"""
    
    # Read and encode the workflow.zip file
    workflow_zip_path = './lambda/workflow.zip'
    
    if not os.path.exists(workflow_zip_path):
        print(f"Error: {workflow_zip_path} not found")
        return None
    
    with open(workflow_zip_path, 'rb') as f:
        workflow_zip_content = f.read()
    
    workflow_zip_base64 = base64.b64encode(workflow_zip_content).decode('utf-8')
    
    # Parameter template for the VEP workflow
    parameter_template = {
        "id": {
            "description": "Sample identifier",
            "optional": False
        },
        "vcf": {
            "description": "Input VCF file path (S3 URI)",
            "optional": False
        },
        "vep_species": {
            "description": "Species name (e.g., homo_sapiens)",
            "optional": False
        },
        "vep_cache": {
            "description": "S3 path to the VEP cache directory",
            "optional": False
        },
        "vep_cache_version": {
            "description": "VEP cache version (e.g., 113)",
            "optional": False
        },
        "ecr_registry": {
            "description": "ECR registry path for container images",
            "optional": False
        },
        "vep_genome": {
            "description": "Reference genome build (e.g., GRCh38)",
            "optional": False
        }
    }
    
    try:
        # Check if workflow already exists
        workflows = omics_client.list_workflows()
        existing_workflow = None
        
        for workflow in workflows['items']:
            if workflow['name'] == WORKFLOW_NAME:
                existing_workflow = workflow
                break
        
        if existing_workflow:
            print(f"Workflow '{WORKFLOW_NAME}' already exists with ID: {existing_workflow['id']}")
            return existing_workflow['id']
        
        # Create new workflow
        print(f"Creating VEP workflow: {WORKFLOW_NAME}")
        
        response = omics_client.create_workflow(
            name=WORKFLOW_NAME,
            description='VEP (Variant Effect Predictor) workflow for genomic variant annotation',
            engine='NEXTFLOW',
            definitionZip=workflow_zip_content,
            parameterTemplate=parameter_template,
            storageCapacity=1200
        )
        
        workflow_id = response['id']
        print(f"Workflow created successfully with ID: {workflow_id}")
        
        # Wait for workflow to be ready
        print("Waiting for workflow to be ready...")
        while True:
            workflow_details = omics_client.get_workflow(id=workflow_id)
            status = workflow_details['status']
            print(f"Workflow status: {status}")
            
            if status == 'ACTIVE':
                print("Workflow is ready!")
                break
            elif status in ['FAILED', 'INACTIVE']:
                print(f"Workflow creation failed with status: {status}")
                if 'statusMessage' in workflow_details:
                    print(f"Error message: {workflow_details['statusMessage']}")
                return None
            
            time.sleep(30)
        
        return workflow_id
        
    except Exception as e:
        print(f"Error creating workflow: {e}")
        return None

# Create the workflow
if __name__ == "__main__":
    WORKFLOW_ID = create_vep_workflow()
    print(f"\nVEP Workflow ID: {WORKFLOW_ID}")

## Create Variant Store & Annotation Store

In [None]:
def create_variant_store():
    """Create HealthOmics variant store - simple creation only"""
    
    try:
        # Check if variant store already exists
        variant_stores = omics_client.list_variant_stores()
        for store in variant_stores['variantStores']:
            if store['name'] == VARIANT_STORE_NAME:
                print(f"✅ Variant store '{VARIANT_STORE_NAME}' already exists with ID: {store['id']}")
                return store['id']
        
        # Create new variant store
        print(f"🚀 Creating variant store: {VARIANT_STORE_NAME}")
        
        response = omics_client.create_variant_store(
            name=VARIANT_STORE_NAME,
            description='Variant store for genomics VEP pipeline',
            reference={
                'referenceArn': f'arn:aws:omics:{AWS_REGION}:{account_id}:referenceStore/{REFERENCE_STORE_ID}/reference/{REFERENCE_GENOME_ID}'
            }
        )
        
        variant_store_id = response['id']
        print(f"✅ Variant store creation initiated with ID: {variant_store_id}")
        print("⏳ Store is being created in the background...")
        return variant_store_id
        
    except Exception as e:
        print(f"❌ Error creating variant store: {e}")
        return None

def create_annotation_store():
    """Create HealthOmics annotation store - simple creation only"""
    
    try:
        # Check if annotation store already exists
        annotation_stores = omics_client.list_annotation_stores()
        for store in annotation_stores['annotationStores']:
            if store['name'] == ANNOTATION_STORE_NAME:
                print(f"✅ Annotation store '{ANNOTATION_STORE_NAME}' already exists with ID: {store['id']}")
                return store['id']
        
        # Create new annotation store
        print(f"🚀 Creating annotation store: {ANNOTATION_STORE_NAME}")
        
        response = omics_client.create_annotation_store(
            name=ANNOTATION_STORE_NAME,
            description='Annotation store for genomics VEP pipeline',
            storeFormat='VCF',
            reference={
                'referenceArn': f'arn:aws:omics:{AWS_REGION}:{account_id}:referenceStore/{REFERENCE_STORE_ID}/reference/{REFERENCE_GENOME_ID}'
            }
        )
        
        annotation_store_id = response['id']
        print(f"✅ Annotation store creation initiated with ID: {annotation_store_id}")
        print("⏳ Store is being created in the background...")
        return annotation_store_id
        
    except Exception as e:
        print(f"❌ Error creating annotation store: {e}")
        return None

# Create the stores
VARIANT_STORE_ID = create_variant_store()
ANNOTATION_STORE_ID = create_annotation_store()

print(f"\n📋 Summary:")
print(f"Variant Store ID: {VARIANT_STORE_ID}")
print(f"Annotation Store ID: {ANNOTATION_STORE_ID}")
print(f"\n💡 Use the status checker below to monitor progress")

In [None]:
def quick_status_check(store_name_or_id):
    try:
        store = omics_client.get_variant_store(name=store_name_or_id)
        return f"Status: {store['status']} | ID: {store['id']}"
    except:
        return "Store not found or still creating"

# Usage
print(quick_status_check('genomicsvariantstore'))

In [None]:
def quick_status_check(store_name_or_id):
    try:
        store = omics_client.get_annotation_store(name=store_name_or_id)
        return f"Status: {store['status']} | ID: {store['id']}"
    except:
        return "Store not found or still creating"

# Usage
print(quick_status_check('genomicsannotationstore'))

# Part 3: Lambda Function Deployment

## Deploy VCF Processor Lambda Function

In [None]:
def deploy_vcf_processor_lambda():
    """Deploy the VCF processor Lambda function with proper update handling"""
    
    lambda_zip_path = './lambda/genomics-vep-pipeline-vcf-processor-comprehensive.zip'
    
    if not os.path.exists(lambda_zip_path):
        print(f"Error: {lambda_zip_path} not found")
        return False
    
    try:
        # Read the Lambda function code
        with open(lambda_zip_path, 'rb') as f:
            lambda_zip_content = f.read()
        
        # Step 1: Update the Lambda function code
        print(f"🔄 Updating Lambda function code: {VCF_PROCESSOR_FUNCTION}")
        
        response = lambda_client.update_function_code(
            FunctionName=VCF_PROCESSOR_FUNCTION,
            ZipFile=lambda_zip_content
        )
        
        print(f"✅ Lambda function code updated successfully")
        
        # Step 2: Wait for the code update to complete before updating configuration
        print("⏳ Waiting for code update to complete...")
        
        max_attempts = 30  # 5 minutes max
        for attempt in range(max_attempts):
            try:
                function_info = lambda_client.get_function(FunctionName=VCF_PROCESSOR_FUNCTION)
                last_update_status = function_info['Configuration']['LastUpdateStatus']
                
                print(f"   Attempt {attempt + 1}: Status = {last_update_status}")
                
                if last_update_status == 'Successful':
                    print("✅ Code update completed successfully")
                    break
                elif last_update_status == 'Failed':
                    print("❌ Code update failed")
                    return False
                elif last_update_status in ['InProgress']:
                    print("   Still updating code...")
                else:
                    print(f"   Unknown status: {last_update_status}")
                
            except Exception as e:
                print(f"   Error checking status: {e}")
            
            time.sleep(10)  # Wait 10 seconds between checks
        else:
            print("⚠️  Timeout waiting for code update to complete")
            print("💡 Trying configuration update anyway...")
        
        # Step 3: Update environment variables
        print("🔧 Updating environment variables...")
        
        env_vars = {
            'WORKFLOW_ID': str(WORKFLOW_ID) if WORKFLOW_ID else '',
            'ROLE_ARN': WORKFLOW_ROLE_ARN,
            'OUTPUT_URI': f's3://{VEP_OUTPUT_BUCKET}',
            'DYNAMODB_TABLE': DYNAMODB_TABLE,
            'BATCH_SIZE': str(BATCH_SIZE),
            'ALLOWED_PREFIXES': ''  # Empty means all prefixes allowed
        }
        
        # Wait a bit more before configuration update
        time.sleep(5)
        
        # Retry configuration update with exponential backoff
        max_config_attempts = 5
        for config_attempt in range(max_config_attempts):
            try:
                lambda_client.update_function_configuration(
                    FunctionName=VCF_PROCESSOR_FUNCTION,
                    Environment={'Variables': env_vars}
                )
                
                print("✅ Environment variables updated successfully")
                break
                
            except lambda_client.exceptions.ResourceConflictException as e:
                wait_time = 2 ** config_attempt  # Exponential backoff: 1, 2, 4, 8, 16 seconds
                print(f"   Configuration update conflict (attempt {config_attempt + 1}). Waiting {wait_time}s...")
                time.sleep(wait_time)
                
                if config_attempt == max_config_attempts - 1:
                    print("❌ Failed to update configuration after multiple attempts")
                    print("💡 You can update environment variables manually in the AWS Console")
                    return False
            except Exception as e:
                print(f"❌ Error updating configuration: {e}")
                return False
        
        print("Environment variables updated:")
        for key, value in env_vars.items():
            print(f"  {key}: {value}")
        
        return True
        
    except Exception as e:
        print(f"❌ Error deploying VCF processor Lambda: {e}")
        return False

# Deploy VCF processor Lambda
vcf_processor_deployed = deploy_vcf_processor_lambda()
print(f"\n📊 VCF Processor Lambda deployed: {vcf_processor_deployed}")

## Deploy Workflow Monitor Lambda Function

In [None]:
def deploy_workflow_monitor_lambda():
    """Deploy the workflow monitor Lambda function with proper update handling"""
    
    lambda_zip_path = './lambda/genomics-vep-pipeline-workflow-monitor-fixed.zip'
    
    if not os.path.exists(lambda_zip_path):
        print(f"Error: {lambda_zip_path} not found")
        return False
    
    try:
        # Read the Lambda function code
        with open(lambda_zip_path, 'rb') as f:
            lambda_zip_content = f.read()
        
        # Step 1: Update the Lambda function code
        print(f"🔄 Updating Lambda function code: {WORKFLOW_MONITOR_FUNCTION}")
        
        response = lambda_client.update_function_code(
            FunctionName=WORKFLOW_MONITOR_FUNCTION,
            ZipFile=lambda_zip_content
        )
        
        print(f"✅ Lambda function code updated successfully")
        
        # Step 2: Wait for the code update to complete before updating configuration
        print("⏳ Waiting for code update to complete...")
        
        max_attempts = 30  # 5 minutes max
        for attempt in range(max_attempts):
            try:
                function_info = lambda_client.get_function(FunctionName=WORKFLOW_MONITOR_FUNCTION)
                last_update_status = function_info['Configuration']['LastUpdateStatus']
                
                print(f"   Attempt {attempt + 1}: Status = {last_update_status}")
                
                if last_update_status == 'Successful':
                    print("✅ Code update completed successfully")
                    break
                elif last_update_status == 'Failed':
                    print("❌ Code update failed")
                    return False
                elif last_update_status in ['InProgress']:
                    print("   Still updating code...")
                else:
                    print(f"   Unknown status: {last_update_status}")
                
            except Exception as e:
                print(f"   Error checking status: {e}")
            
            time.sleep(10)  # Wait 10 seconds between checks
        else:
            print("⚠️  Timeout waiting for code update to complete")
            print("💡 Trying configuration update anyway...")
        
        # Step 3: Update environment variables
        print("🔧 Updating environment variables...")
        
        env_vars = {
            'ROLE_ARN': WORKFLOW_ROLE_ARN,
            'VARIANT_STORE_NAME': VARIANT_STORE_NAME,
            'ANNOTATION_STORE_NAME': ANNOTATION_STORE_NAME,
            'DYNAMODB_TABLE': DYNAMODB_TABLE,
            'DATABASE_NAME': DATABASE_NAME
        }
        
        # Wait a bit more before configuration update
        time.sleep(5)
        
        # Retry configuration update with exponential backoff
        max_config_attempts = 5
        for config_attempt in range(max_config_attempts):
            try:
                lambda_client.update_function_configuration(
                    FunctionName=WORKFLOW_MONITOR_FUNCTION,
                    Environment={'Variables': env_vars}
                )
                
                print("✅ Environment variables updated successfully")
                break
                
            except lambda_client.exceptions.ResourceConflictException as e:
                wait_time = 2 ** config_attempt  # Exponential backoff: 1, 2, 4, 8, 16 seconds
                print(f"   Configuration update conflict (attempt {config_attempt + 1}). Waiting {wait_time}s...")
                time.sleep(wait_time)
                
                if config_attempt == max_config_attempts - 1:
                    print("❌ Failed to update configuration after multiple attempts")
                    print("💡 You can update environment variables manually in the AWS Console")
                    return False
            except Exception as e:
                print(f"❌ Error updating configuration: {e}")
                return False
        
        print("Environment variables updated:")
        for key, value in env_vars.items():
            print(f"  {key}: {value}")
        
        return True
        
    except Exception as e:
        print(f"❌ Error deploying workflow monitor Lambda: {e}")
        return False

# Deploy workflow monitor Lambda
workflow_monitor_deployed = deploy_workflow_monitor_lambda()
print(f"\n📊 Workflow Monitor Lambda deployed: {workflow_monitor_deployed}")

## Configure S3 Event Notifications

In [None]:
def configure_s3_notifications_fixed():
    """Configure S3 event notifications with correct parameter names"""
    
    try:
        # 🔹 INPUT BUCKET CONFIGURATION
        print(f"Configuring S3 notifications for input bucket: {VCF_INPUT_BUCKET}")
        
        vcf_input_notification = {
            'LambdaFunctionConfigurations': [  # ✅ Fixed: was 'LambdaConfigurations'
                {
                    'Id': 'VcfProcessorTrigger',
                    'LambdaFunctionArn': f'arn:aws:lambda:{AWS_REGION}:{account_id}:function:{VCF_PROCESSOR_FUNCTION}',
                    'Events': ['s3:ObjectCreated:*'],
                    'Filter': {
                        'Key': {
                            'FilterRules': [
                                {'Name': 'suffix', 'Value': '.vcf'}
                            ]
                        }
                    }
                },
                {
                    'Id': 'VcfGzProcessorTrigger',
                    'LambdaFunctionArn': f'arn:aws:lambda:{AWS_REGION}:{account_id}:function:{VCF_PROCESSOR_FUNCTION}',
                    'Events': ['s3:ObjectCreated:*'],
                    'Filter': {
                        'Key': {
                            'FilterRules': [
                                {'Name': 'suffix', 'Value': '.vcf.gz'}
                            ]
                        }
                    }
                }
            ]
        }
        
        s3_client.put_bucket_notification_configuration(
            Bucket=VCF_INPUT_BUCKET,
            NotificationConfiguration=vcf_input_notification
        )
        
        print("✅ VCF input bucket notifications configured successfully")
        
        # 🔹 OUTPUT BUCKET CONFIGURATION
        print(f"Configuring S3 notifications for output bucket: {VEP_OUTPUT_BUCKET}")
        
        vep_output_notification = {
            'LambdaFunctionConfigurations': [  # ✅ Fixed: was 'LambdaConfigurations'
                {
                    'Id': 'VepAnnotatedVcfTrigger',
                    'LambdaFunctionArn': f'arn:aws:lambda:{AWS_REGION}:{account_id}:function:{WORKFLOW_MONITOR_FUNCTION}',
                    'Events': ['s3:ObjectCreated:*'],
                    'Filter': {
                        'Key': {
                            'FilterRules': [
                                {'Name': 'suffix', 'Value': '.ann.vcf.gz'}  # VEP annotated files
                            ]
                        }
                    }
                },
                {
                    'Id': 'VepAnnotatedVcfUncompressedTrigger',
                    'LambdaFunctionArn': f'arn:aws:lambda:{AWS_REGION}:{account_id}:function:{WORKFLOW_MONITOR_FUNCTION}',
                    'Events': ['s3:ObjectCreated:*'],
                    'Filter': {
                        'Key': {
                            'FilterRules': [
                                {'Name': 'suffix', 'Value': '.ann.vcf'}  # VEP annotated files (uncompressed)
                            ]
                        }
                    }
                }
            ]
        }
        
        s3_client.put_bucket_notification_configuration(
            Bucket=VEP_OUTPUT_BUCKET,
            NotificationConfiguration=vep_output_notification
        )
        
        print("✅ VEP output bucket notifications configured successfully")
        
        return True
        
    except Exception as e:
        print(f"❌ Error configuring S3 notifications: {e}")
        return False

## Configure EventBridge monitoring

In [None]:
def configure_eventbridge_workflow_monitoring():
    """Configure EventBridge to monitor HealthOmics workflow completion"""
    
    try:
        # Check if EventBridge rule already exists
        existing_rules = events_client.list_rules(NamePrefix=f'{CLOUDFORMATION_STACK_NAME}-workflow-status')
        
        if existing_rules['Rules']:
            print("✅ EventBridge workflow monitoring rule already exists")
            return True
        
        # Create EventBridge rule for workflow completion
        rule_name = f'{CLOUDFORMATION_STACK_NAME}-workflow-completion-monitor'
        
        print(f"🔧 Creating EventBridge rule: {rule_name}")
        
        # Create the rule
        events_client.put_rule(
            Name=rule_name,
            EventPattern=json.dumps({
                "source": ["aws.omics"],
                "detail-type": ["HealthOmics Run Status Change"],
                "detail": {
                    "status": ["COMPLETED", "FAILED", "CANCELLED"],
                    "workflowId": [str(WORKFLOW_ID)] if WORKFLOW_ID else []
                }
            }),
            State='ENABLED',
            Description='Monitor VEP workflow completion for automatic variant import'
        )
        
        # Add Lambda target
        events_client.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': '1',
                    'Arn': f'arn:aws:lambda:{AWS_REGION}:{account_id}:function:{WORKFLOW_MONITOR_FUNCTION}',
                    'InputTransformer': {
                        'InputPathsMap': {
                            'runId': '$.detail.runId',
                            'status': '$.detail.status',
                            'workflowId': '$.detail.workflowId'
                        },
                        'InputTemplate': '{"source": "eventbridge", "runId": "<runId>", "status": "<status>", "workflowId": "<workflowId>"}'
                    }
                }
            ]
        )
        
        print("✅ EventBridge workflow monitoring configured successfully")
        return True
        
    except Exception as e:
        print(f"❌ Error configuring EventBridge: {e}")
        return False


## Checking the configurations

In [None]:
def configure_complete_pipeline_triggers():
    """Configure both S3 and EventBridge triggers for complete pipeline"""
    
    print("🚀 Configuring Complete Pipeline Triggers")
    print("=" * 50)
    
    # 1. Configure S3 notifications for VCF input (triggers VEP workflow)
    print("1️⃣ Configuring S3 notifications for VCF input...")
    s3_success = configure_s3_notifications_fixed()
    
    # 2. Configure EventBridge for workflow completion (triggers variant import)
    print("\n2️⃣ Configuring EventBridge for workflow monitoring...")
    eventbridge_success = configure_eventbridge_workflow_monitoring()
    
    # 3. Optional: Configure S3 notifications for VEP output (backup trigger)
    print("\n3️⃣ S3 notifications for VEP output configured as backup")
    
    print(f"\n📊 Configuration Summary:")
    print(f"   S3 Input Triggers: {'✅ Success' if s3_success else '❌ Failed'}")
    print(f"   EventBridge Monitoring: {'✅ Success' if eventbridge_success else '❌ Failed'}")
    
    print(f"\n🔄 Pipeline Flow:")
    print(f"   1. Upload VCF → S3 Input Bucket → VCF Processor Lambda → VEP Workflow")
    print(f"   2. VEP Workflow Complete → EventBridge → Workflow Monitor Lambda → Variant Import")
    print(f"   3. Variant Import Complete → Data ready for Agent queries")
    
    return s3_success and eventbridge_success

# Configure the complete pipeline
pipeline_configured = configure_complete_pipeline_triggers()

# Part 3b: Running analytics on healthomics variant store

### Python package imports

In [None]:
from datetime import datetime
import json
import os
import time
import urllib

import boto3
import botocore.exceptions

### Create a service IAM role

For the purposes of this tutorial, we will use the following policy and trust policy to demo the capabilities of AWS HealthOmics, you are free to customize permissions as required for your use case after going though this tutorial.

NOTE: In this case we've defined rather permissive permissions (i.e. "*" Resources). In particular, we are allowing read/write access to all S3 buckets available to the account for this tutorial. In a real world setting you will want to scope this down to only the minimally needed actions on necessary resources.

In [None]:
# set a timestamp
dt_fmt = '%Y%m%dT%H%M%S'
ts = datetime.now().strftime(dt_fmt)

In [None]:
demo_policy = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "omics:*"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "ram:AcceptResourceShareInvitation",
        "ram:GetResourceShareInvitations"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts",
        "s3:GetObjectAcl",
        "s3:PutObjectAcl"
      ],
      "Resource": "*"
    }
  ]
}

demo_trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "omics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

In order to proceed we need to create a couple of resources the first is the role that you will be passing into AWS HealthOmics. If the role doesn't exist, we will need to create it and attach the policy and trust policy defined above.

In [None]:
# We will use this as the base name for our role and policy
omics_iam_name = f'OmicsTutorialRole-{ts}'

# Create the iam client
iam = boto3.resource('iam')

# Check if the role already exist if not create it
try:
    role = iam.Role(omics_iam_name)
    role.load()
    
except botocore.exceptions.ClientError as ex:
    if ex.response["Error"]["Code"] == "NoSuchEntity":
        #Create the role with the corresponding trust policy
        role = iam.create_role(
            RoleName=omics_iam_name, 
            AssumeRolePolicyDocument=json.dumps(demo_trust_policy))
        
        #Create policy
        policy = iam.create_policy(
            PolicyName='{}-policy'.format(omics_iam_name), 
            Description="Policy for AWS HealthOmics demo",
            PolicyDocument=json.dumps(demo_policy))
        
        #Attach the policy to the role
        policy.attach_role(RoleName=omics_iam_name)
    else:
        print('Something went wrong, please retry and check your account settings and permissions')
        print(ex)

Now that we know the role exists, lets create a helper function to help us retrieve the role arn which we will need to pass into the service API calls. The role arn will grant AWS HealthOmics the permissions it needs to access the resources it needs in your AWS account.

In [None]:
omics_iam_name = 'OmicsTutorialRole-20250818T091459'

In [None]:
def get_role_arn(role_name):
    try:
        iam = boto3.resource('iam')
        role = iam.Role(role_name)
        role.load()  # calls GetRole to load attributes
    except ClientError:
        print("Couldn't get role named %s."%role_name)
        raise
    else:
        return role.arn

### Creating the AWS HealthOmics client

In [None]:
import boto3
omics = boto3.client('omics')

region = omics.meta.region_name
regional_bucket = f'<YOUR_SOURCE_BUCKET>'

### Check the created variant store and vcf files imported into it

In [None]:
var_store = omics.get_variant_store(name='genomicsvariantstore')

In [None]:
omics.list_variant_import_jobs(filter={"storeName": var_store['name']})

## Step 4 - Query in Athena

In order to query your data in Amazon Athena, you need to create resource links to your database using the AWS Lake Formation Console. You will also need to ensure that the IAM user running this notebook is a Data Lake Administrator. Note without both of these in place, the following queries will fail. To satisfy these prerequisites, refer to the instructions provided in the AWS Lake Formation documentation and AWS HealthOmics documentation.

The following code will create resource links to the default database in the AwsDataCatalog in AWS Glue. It makes a few assumptions to do so - like IAM identity you are using to run this notebook is a Data Lake Administrator and has the permissions to create AWS Glue tables.

If you want to be fully sure you are making the correct resource links and providing access to them to the correct identities it is best to create them directly. Refer to the instructions in the AWS HealthOmics documentation on how to do this.

We'll need to work with AWS RAM, AWS Glue, and AWS Lake Formation to setup resource links and grant database permissions.

In [None]:
ram = boto3.client('ram')
glue = boto3.client('glue')

caller_identity = boto3.client('sts').get_caller_identity()
AWS_ACCOUNT_ID = caller_identity['Account']
AWS_IDENITY_ARN = caller_identity['Arn']

First we'll list available shared resources from OTHER-ACCOUNTS in AWS RAM and look for the resource that matches the id of the Variant store we created above.

In [None]:
response = ram.list_resources(resourceOwner='OTHER-ACCOUNTS', resourceType='glue:Database')

if not response.get('resources'):
    print('no shared resources found. verify that you have successfully created an HealthOmics Analytics store')
else:
    variantstore_resources = [resource for resource in response['resources'] if var_store['id'] in resource['arn']]
    if not variantstore_resources:
        print(f"no shared resources matching variant store id {var_store['id']} found")
    else:
        variantstore_resource = variantstore_resources[0]

variantstore_resource

Next, we'll get the specific resource share the Variant store is associated with. This is so we can get the owningAccountId attribute for the share. (Note we could also do this by parsing the resourceShareArn for the resource above, but doing it this way is more explicit)

In [None]:
resource_share = ram.get_resource_shares(
    resourceOwner='OTHER-ACCOUNTS', 
    resourceShareArns=[variantstore_resource['resourceShareArn']])['resourceShares'][0]
resource_share

Now we'll create a table in AWS Glue for the variant store. This is the same as creating a resource link in AWS Lake Formation.

In [None]:
# this creates a resource link to the table for the variant store and adds it to the `default` database
glue.create_table(
    DatabaseName='<YOUR_AWS_PROFILE>',
    TableInput = {
        "Name": var_store['name'],
        "TargetTable": {
            "CatalogId": resource_share['owningAccountId'],
            "DatabaseName": f"variant_{AWS_ACCOUNT_ID}_{var_store['id']}",
            "Name": var_store['name'],
        }
    }
)

For this section of the tutorial, the identity that runs this notebook either:

1. needs to be a Data lake administrator in AWS Lake Formation, or
2. must be granted access to the AWS RAM shared resources by an existing administrator.
The latter pattern is recommended. Both DESCRIBE and SELECT on the target table for the variant store are required and can be done via the "Grant on target" action on a resource link in the AWS Lake Formation Console. You can also do this with and admin identity via the SDK with code like:

In [None]:
sts = boto3.client('sts')
identity = sts.get_caller_identity()

# Extract the role ARN from the assumed role ARN
assumed_role_arn = identity['Arn']
# Convert from: <YOUR_ASSUMED_ROLE_ARN>
# To: arn:aws:iam::<YOUR_ACCOUNT_ID>:role/SageMakerNotebookInstanceRole
role_arn = assumed_role_arn.replace(':sts:', ':iam:').replace(':assumed-role/', ':role/').rsplit('/', 1)[0]

lfn = boto3.client('lakeformation')

lfn.grant_permissions(
    Principal={
        "DataLakePrincipalIdentifier": role_arn
    },
    Resource={
        "Table": {
            "CatalogId": resource_share['owningAccountId'],
            "DatabaseName": f"variant_{AWS_ACCOUNT_ID}_{var_store['id']}",
            "Name": var_store['name']
        }
    },
    Permissions=[ 'DESCRIBE' ],
    PermissionsWithGrantOption=[ 'DESCRIBE' ]
)

lfn.grant_permissions(
    Principal={
        "DataLakePrincipalIdentifier": role_arn
    },
    Resource={
        "TableWithColumns": {
            "CatalogId": resource_share['owningAccountId'],
            "DatabaseName": f"variant_{AWS_ACCOUNT_ID}_{var_store['id']}",
            "Name": var_store['name'],
            "ColumnWildcard": {}
        }
    },
    Permissions=[ 'SELECT' ],
    PermissionsWithGrantOption=[ 'SELECT' ]
)

Now that we have resource links created, we can start querying the data using Amazon Athena. You don't need to wait for all the import jobs to complete to start doing this. Queries can be made while data imports in the background.

To query AWS HealthOmics Analytics stores, you need to use Athena engine version 3. The following code checks if you have an existing Athena workgroup that satisfies this criteria. If not it will create one called omics.

In [None]:
athena = boto3.client('athena')

In [None]:
athena_workgroups = athena.list_work_groups()['WorkGroups']
athena_workgroups

In [None]:
athena_workgroups = athena.list_work_groups()['WorkGroups']

athena_workgroup = None
for wg in athena_workgroups:
    print(wg['EngineVersion']['EffectiveEngineVersion'])
    if wg['EngineVersion']['EffectiveEngineVersion'] == 'Athena engine version 3':
        print(f"Workgroup '{wg['Name']}' found using Athena engine version 3")
        athena_workgroup = wg
        break
else:
    print("No workgroups with Athena engine version 3 found. creating one")
    athena_workgroup = athena.create_work_group(
        Name='omics',
        Configuration={
            "EngineVersion": {
                "SelectedEngineVersion": "Athena engine version 3"
            }
        }
    )

athena_workgroup

Let's start writing queries!

For fun, let's calculate the TI/TV ratio for these samples. You can navigate to the Athena console or do this from your Jupyter Notebook. This example uses the workgroup omics and assumes you have made a resource link to your Variant store in your default database.

In [None]:
!pip install awswrangler

In [None]:
import awswrangler as wr

In [None]:
simple_query = f"""SELECT * FROM "default"."{var_store['name']}" LIMIT 10"""

In [None]:
import awswrangler as wr
df_titv = wr.athena.read_sql_query(
    simple_query, 
    database='<YOUR_AWS_PROFILE>', 
    workgroup=athena_workgroup['Name'])

In [None]:
count_query = f"""SELECT DISTINCT sampleid FROM "default"."{var_store['name']}" """

In [None]:
df_count = wr.athena.read_sql_query(
    count_query, 
    database='<YOUR_AWS_PROFILE>', 
    workgroup=athena_workgroup['Name'])
df_count

In [None]:
!aws s3 cp s3://<YOUR_SOURCE_BUCKET>/dragen_vcfs2/NA21135.hard-filtered.vcf.gz s3://<YOUR_VCF_INPUT_BUCKET>/

In [None]:
!aws dynamodb scan --table-name genomics-vep-pipeline-tracking

In [None]:
!aws s3 cp s3://<YOUR_SOURCE_BUCKET>/dragen_vcfs2/NA21144.hard-filtered.vcf.gz s3://<YOUR_VCF_INPUT_BUCKET>/

# Annotation Store

Now, let's set up an Annotation store.

AWS HealthOmics Annotation stores support annotations in VCF, GFF, and TSV formats. In this tutorial, we import ClinVar annotations which can be downloaded from the NCBI as a VCF file. Imports need to come from an S3 location in the same region, so we'll use a copy in a regional bucket for this tutorial.

In [None]:
!wget https://ftp.ncbi.nlm.nih.gov/pub/clinvar/vcf_GRCh38/clinvar_20250810.vcf.gz

In [None]:
!aws s3 cp clinvar_20250810.vcf.gz s3://<YOUR_SOURCE_BUCKET>/clinvar20250810/

In [None]:
SOURCE_ANNOTATION_URI = f"s3://<YOUR_SOURCE_BUCKET>/clinvar20250810/clinvar_20250810.vcf.gz"

## Creating, importing data into, and querying an Annotation store

The process of creating, importing data into, and querying an Annotation store is similar to the process you did above for the Variant store, so we'll be brief on the descriptions of each step.

In [None]:
ann_store_name = f'genomicsannotationstore'
ref_name = 'dragen-ref-genome3'  ## Change this reference name to match one you have created if needed

In [None]:
response = omics.start_annotation_import_job(
    destinationName=f'genomicsannotationstore',
    roleArn=get_role_arn(omics_iam_name),
    items=[{"source": SOURCE_ANNOTATION_URI}]
)

In [None]:
response

In [None]:
omics.list_annotation_import_jobs(filter={"storeName": ann_store_name})

In [None]:
ann_store = omics.get_annotation_store(name='genomicsannotationstore')

Creating a resource link to the Annotation store is the same as with the Variant store. We'll do this all in one cell below.

In [None]:
response = ram.list_resources(resourceOwner='OTHER-ACCOUNTS', resourceType='glue:Database')

if not response.get('resources'):
    print('no shared resources found. verify that you have successfully created an HealthOmics Analytics store')
else:
    annotationstore_resources = [resource for resource in response['resources'] if ann_store['id'] in resource['arn']]
    if not annotationstore_resources:
        print(f"no shared resources matching annotation store id {ann_store['id']} found")
    else:
        annotationstore_resource = annotationstore_resources[0]

        resource_share = ram.get_resource_shares(
            resourceOwner='OTHER-ACCOUNTS', 
            resourceShareArns=[annotationstore_resource['resourceShareArn']])['resourceShares'][0]
        
        # this creates a resource link to the table for the annotation store and adds it to the `default` database
        glue.create_table(
            DatabaseName='<YOUR_AWS_PROFILE>',
            TableInput = {
                "Name": ann_store['name'],
                "TargetTable": {
                    "CatalogId": resource_share['owningAccountId'],
                    "DatabaseName": f"annotation_{AWS_ACCOUNT_ID}_{ann_store['id']}",
                    "Name": ann_store['name'],
                }
            }
        )

In [None]:
response

In [None]:
annot_query = f"""SELECT * FROM "default"."{ann_store['name']}" LIMIT 10"""

In [None]:
df_annot = wr.athena.read_sql_query(
    annot_query, 
    database='<YOUR_AWS_PROFILE>', 
    workgroup=athena_workgroup['Name'])
df_annot

In [None]:
clin_query = f"""SELECT 
    v.sampleid,
    v.contigname,
    v.start,
    v.referenceallele,
    v.alternatealleles,
    a.names as clinvar_variation_id,
    -- Extract clinical significance from attributes map
    CASE 
        WHEN a.attributes['CLNSIG'] = 'Pathogenic' THEN 'Pathogenic'
        WHEN a.attributes['CLNSIG'] = 'Likely_pathogenic' THEN 'Likely Pathogenic'
        WHEN a.attributes['CLNSIG'] = 'Uncertain_significance' THEN 'Uncertain Significance'
        WHEN a.attributes['CLNSIG'] = 'Likely_benign' THEN 'Likely Benign'
        WHEN a.attributes['CLNSIG'] = 'Benign' THEN 'Benign'
        ELSE 'Unknown'
    END as clinical_significance,
    -- Extract gene name from GENEINFO (split on colon and take first part)
    CASE 
        WHEN a.attributes['GENEINFO'] IS NOT NULL 
        THEN split_part(a.attributes['GENEINFO'], ':', 1)
        ELSE NULL
    END as gene_name,
    -- Keep full gene info for reference
    a.attributes['GENEINFO'] as full_gene_info
FROM "default"."{var_store['name']}" v
INNER JOIN "default"."{ann_store['name']}" a ON (
    REPLACE(v.contigname, 'chr', '') = a.contigname
    AND v.start = a.start
    AND v.referenceallele = a.referenceallele
    AND v.alternatealleles = a.alternatealleles
)
WHERE a.attributes IS NOT NULL 
    AND a.attributes['CLNSIG'] IS NOT NULL"""

In [None]:
v_explore_annot = wr.athena.read_sql_query(
    comprehensive_query_limited, 
    database='<YOUR_AWS_PROFILE>', 
    workgroup=athena_workgroup['Name'])
v_explore_annot