# METAL


# Load and Setup

In [1]:
# Load libraries
from IPython.display import display, HTML
import pandas as pd
import fastparquet
import polars as pl
import scipy
import numpy as np
import copy
import re
from pathlib import Path

import os
import subprocess

version = %env WORKSPACE_CDR
my_bucket = os.getenv('WORKSPACE_BUCKET')

In [2]:
conditions = ["afib", "allergies", "asthma", "cholelithiasis", "t2d", "fibroids"]
categories = ['ehr', 'survey', 'ehr_and_survey', 'ehr_or_survey']

traits = {}
for disease in conditions:
    for condition in categories:
        traits[f"{disease}_{condition}"] = 'binary'

In [3]:
## output folders
output_folder = f'{my_bucket}/data/saige_gwas/v0'

In [10]:
# test_df = pl.read_csv(f'{output_folder}/amr/allergies_ehr/gwas_results.tsv.gz', separator='\t')

# Function

In [4]:
def gcs_file_exists(gs_path):
    """Check if specific GCS file exists using gsutil ls"""
    try:
        result = subprocess.run(f'gsutil ls {gs_path}', 
                              shell=True, capture_output=True)
        return result.returncode == 0
    except:
        return False

def validate_saige_inputs(trait, ancestries, base_output_folder):
    """
    Validate SAIGE output files for METAL meta-analysis
    """
    required_file = "gwas_results.tsv.gz"
    validated_inputs = {}
    
    for anc in ancestries:
        in_dir = f"{base_output_folder}/{anc}/{trait}"
        file_path = f"{in_dir}/{required_file}"
    
        # Check file exists
        if not gcs_file_exists(file_path):
            print(f"Missing {required_file} for ancestry {anc}")
            continue
            
        # Extract sample sizes from the file
        sample_info = extract_sample_sizes(file_path)
        if not sample_info:
            print(f"Could not extract sample sizes for ancestry {anc}")
            continue

        validated_inputs[anc] = {
            'path': file_path,
            'n_cases': sample_info['n_cases'],
            'n_controls': sample_info['n_controls'], 
            'n_total': sample_info['n_total']
        }
        
        print(f"Ancestry {anc}: {sample_info['n_cases']} cases, {sample_info['n_controls']} controls (total: {sample_info['n_total']})")
    
    if len(validated_inputs) < 2:
        print(f"Error: Need at least 2 ancestries for meta-analysis. Found: {len(validated_inputs)}")
        return None
    
    print(f"Validated {len(validated_inputs)} studies for {trait} \n")    
    return validated_inputs

def extract_sample_sizes(file_path):
    """
    Extract n_cases and n_controls from first data row of SAIGE file
    Returns dict with sample size info or None if failed
    """
    try:
        # Read just the first few lines to get sample sizes
        cmd = f"gsutil cat '{file_path}' | gunzip | head -2 2>/dev/null"
        result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
        
        if not result.stdout:
            print(f"No output from command for {file_path}")
            return None
        lines = result.stdout.strip().split('\n')

        if len(lines) < 2:
            print(f"File {file_path} has insufficient data")
            return None
            
        header = lines[0].split('\t')
        data = lines[1].split('\t')
        
        # Find column indices
        try:
            n_cases_idx = header.index('n_cases')
            n_controls_idx = header.index('n_controls')
        except ValueError as e:
            print(f"Missing required columns in {file_path}: {e}")
            return None
        
        # Extract values
        n_cases = int(float(data[n_cases_idx]))
        n_controls = int(float(data[n_controls_idx]))
        n_total = n_cases + n_controls
        
        return {
            'n_cases': n_cases,
            'n_controls': n_controls,
            'n_total': n_total
        }
        
    except Exception as e:
        print(f"Error extracting sample sizes from {file_path}: {e}")
        return None

def print_dsub_readable(cmd):
    """
    Simple readable format - newline before each --
    """
    readable_cmd = cmd.replace(' --', ' \\\n    --')
    print(readable_cmd)
    print()  # Extra line for separation
    
def dsub_script(
    label,
    machine_type,
    envs,
    in_params,
    out_params,
    out_dirs,
    boot_disk = 100,
    disk_size = 150,
    image = 'us.gcr.io/broad-dsp-gcr-public/terra-jupyter-aou:2.2.14',
    script = 'run_metal.sh',
    preemptible = True
):
    
    # get useful info
    dsub_user_name = os.getenv("OWNER_EMAIL").split('@')[0]
    
    user_name = os.getenv("OWNER_EMAIL").split('@')[0].replace('.','-')

    job_name = f'{label}'
    
    dsub_cmd = 'dsub '
    dsub_cmd += '--provider google-batch '
    dsub_cmd += '--user-project "${GOOGLE_PROJECT}" '
    dsub_cmd += '--project "${GOOGLE_PROJECT}" '
    dsub_cmd += '--image "{}" '.format(image)
    dsub_cmd += '--network "global/networks/network" '
    dsub_cmd += '--subnetwork "regions/us-central1/subnetworks/subnetwork" '
    dsub_cmd += '--service-account "$(gcloud config get-value account)" '
    dsub_cmd += '--use-private-address '
    dsub_cmd += '--user "{}" '.format(dsub_user_name)
    dsub_cmd += '--regions us-central1 '
    dsub_cmd += '--logging "${WORKSPACE_BUCKET}/dsub/logs/{job-name}/{user-id}/$(date +\'%Y%m%d\')/{job-id}-{task-id}-{task-attempt}.log" '
    dsub_cmd += ' "$@" '
    dsub_cmd += '--name "{}" '.format(job_name)
    dsub_cmd += '--machine-type "{}" '.format(machine_type)
    
    if preemptible:
        dsub_cmd += '--preemptible '
        
    if 'c4' in machine_type:
        raise ValueError(
            f"c4 machine types ('{machine_type}') are not supported with dsub. "
            f"c4 requires hyperdisk-balanced boot disks, but dsub doesn't allow "
            f"setting boot disks. Use c2 or n2 instead."
        )

#        # c4 doesn't use pd-ssd
#         dsub_cmd += '--disk-type "hyperdisk-balanced" '
#     else:
#         dsub_cmd += '--disk-type "pd-ssd" '
        
    dsub_cmd += '--boot-disk-size {} '.format(boot_disk)
    dsub_cmd += '--disk-size {} '.format(disk_size)
    dsub_cmd += '--script "{}" '.format(script)
    
    # Assign any environmental conditions
    for env_key in envs.keys():
        dsub_cmd += '--env {}="{}" '.format(env_key, envs[env_key])
        
    # Assign any inputs
    for in_key in in_params.keys():
        dsub_cmd += '--input {}="{}" '.format(in_key, in_params[in_key])
        
    # Assign any outputs
    if out_params != None:
        for out_key in out_params.keys():
            dsub_cmd += '--output {}="{}" '.format(out_key, out_params[out_key])
        
    for out_key in out_dirs.keys():
        dsub_cmd += '--output-recursive {}="{}" '.format(out_key, out_dirs[out_key])

    os.system(dsub_cmd)
#     print_dsub_readable(dsub_cmd)

In [5]:
def validate_age_format(age: str) -> bool:
    """
    Validate age format for dsub dstat command
    """
    # Pattern: one or more digits followed by exactly one valid unit
    pattern = r'^\d+[smhdw]$'
    return bool(re.match(pattern, age.lower()))

In [6]:
def check_dsub_status(user: str = None, full: bool = False, age: str = '1d') -> subprocess.CompletedProcess:
    """
    Check status of dsub jobs for the specified user
    
    Parameters:
    -----------
    user : str, optional
        Username to check jobs for. Defaults to current user from OWNER_EMAIL
    full : bool, default False
        Include full job details in output
    age : str, default '1d'
        Maximum age of jobs to display. Format: <integer><unit>
        Units: s (seconds), m (minutes), h (hours), d (days), w (weeks)
        Examples: '3d', '12h', '30m', '7w'
        
    Returns:
    --------
    subprocess.CompletedProcess
        Result of the dstat command
        
    Examples:
    ---------
    >>> check_dsub_status(age='3d', full=True)  # Last 3 days, full details
    >>> check_dsub_status()  # Default: last day, summary view
    """
    
    if user is None:
        # Get current user if not specified
        user = os.getenv("OWNER_EMAIL").split('@')[0]
    
    project = os.getenv("GOOGLE_PROJECT")
    
    # Validate age parameter
    if age is not None:
        if not validate_age_format(age):
            raise ValueError(
                f"Invalid age format: '{age}'. "
                "Expected format: <integer><unit> where unit is one of: s, m, h, d, w. "
                "Examples: '3d', '12h', '30m', '7w'"
            )
    
    # Build command
    cmd_parts = [
        "dstat",
        "--provider google-batch",
        f"--user {user}",
        "--status '*'",
        f"--project {project}"
    ]
    
    if full:
        cmd_parts.append("--full")
    
    if age:
        cmd_parts.append(f"--age {age}")
    
    cmd = " ".join(cmd_parts)
    print(f"Running: {cmd}")
    return subprocess.run(cmd, shell=True, capture_output=False)

In [7]:
def cancel_running_jobs():
    """Cancel only running/pending jobs (safer)"""
    project = os.getenv("GOOGLE_PROJECT")
    
    # Cancel only running jobs
    cancel_cmd = f"ddel --provider google-batch --project {project} --users 'bwaxse' --jobs '*'"
    print(f"Canceling running jobs: {cancel_cmd}")
    
    return subprocess.run(cancel_cmd, shell=True, capture_output=False)

In [8]:
def job_details(user=None, job=None):
    """List all jobs for the user, including failed ones"""
    project = os.getenv("GOOGLE_PROJECT")
    
    if user is None:
        user = os.getenv("OWNER_EMAIL").split('@')[0]
        
    if job is None:
        job = "'*' "
    else:
        job = f'--jobs {job} '
    
    cmd = f"dstat --provider google-batch --project {project} --user {user} --status {job}--full"
    print(f"Running: {cmd}")
    return subprocess.run(cmd, shell=True, capture_output=False)

In [9]:
def view_dsub_logs(log_path):
    base_path = log_path.replace('.log', '')
    
    print("=== STDOUT ===")
    subprocess.run(['gsutil', 'cat', f'{base_path}-stdout.log'])
    
    print("\n=== STDERR ===") 
    subprocess.run(['gsutil', 'cat', f'{base_path}-stderr.log'])

# Run METAL

In [36]:
%%writefile run_metal.sh

#!/bin/bash
set -euo pipefail

echo "Starting METAL meta-analysis for ${TRAIT}"
echo "Number of input files: ${N_FILES}"

# Parse ancestries
IFS=',' read -ra INPUTS <<< "${INPUTS}"

# Parse sample sizes
IFS=',' read -ra SIZES <<< "${SAMPLE_SIZES}"

echo "Preprocessing files to add ancestry-specific n_total columns..."
PROCESSED_FILES=()

# Pre-processing adds sample size via n_cases and n_controls extracted from SAIGE GWAS results 
for i in "${!INPUTS[@]}"; do
    var_name="${INPUTS[$i]}"
    input_file="${!var_name}"
    n_total="${SIZES[$i]}"
    processed_file="/tmp/processed_file_${i}.tsv"
    
    echo "Processing file $((i+1)): ${input_file} (n_total=${n_total})"
    
    # Check if file exists
    if [[ ! -f "${input_file}" ]]; then
        echo "ERROR: Input file not found: ${input_file}"
        echo "Available files in /mnt/data/input/:"
        ls -la /mnt/data/input/ || echo "Input directory not accessible"
        exit 1
    fi
    
    # Add constant n_total column for this ancestry
    if [[ "${input_file}" == *.gz ]]; then
        gzip -dc "${input_file}"
    else
        cat "${input_file}"
    fi | awk -F'\t' -v OFS='\t' -v n_total="${n_total}" '
        NR==1 { print $0, "n_total"; next }
               { print $0, n_total }' > "${processed_file}"
    
    PROCESSED_FILES+=("/processed_file_${i}.tsv")
    
    # Verify output
    n_lines=$(wc -l < "${processed_file}")
    echo "  Created ${processed_file} with ${n_lines} lines"
done

# Convert processed files array to comma-separated string
PROCESSED_FILES_STR=$(IFS=','; echo "${PROCESSED_FILES[*]}")
echo "Running METAL with processed files: ${PROCESSED_FILES_STR}"

# Create a METAL script dynamically
METAL_SCRIPT="${OUTPUT_PATH}/metal_script.txt"
cat > "${METAL_SCRIPT}" <<EOF
MARKER vid
WEIGHT n_total
ALLELE effect_allele other_allele
FREQ effect_allele_frequency
PVAL p_value
EFFECT beta
STDERR standard_error
SEPARATOR TAB
SCHEME STDERR

# Auto-flip alleles based on frequency
AVERAGEFREQ ON
MINMAXFREQ ON

$(for file in "${PROCESSED_FILES[@]}"; do echo "PROCESS $file"; done)

OUTFILE ${OUT_PREF} .tsv
ANALYZE HETEROGENEITY
QUIT
EOF

echo "Metal script created"

# Run METAL
metal "${METAL_SCRIPT}"

mv ${OUT_PREF}1.tsv "${OUTPUT_PATH}/"
mv ${OUT_PREF}1.tsv.info "${OUTPUT_PATH}/"

# List all output files
echo "Final contents of OUTPUT_PATH (${OUTPUT_PATH}):"
ls -lh "${OUTPUT_PATH}" || echo "OUTPUT_PATH not accessible"

echo "METAL analysis completed successfully for ${TRAIT}"

Overwriting run_metal.sh


In [11]:
def run_metal(trait, ancestries, base_output_folder):
    """
    Run METAL meta-analysis with SAIGE-formatted inputs
    """
    artifact_registry = os.getenv('ARTIFACT_REGISTRY_DOCKER_REPO', '')
            
    # Validate SAIGE inputs
    validated_inputs = validate_saige_inputs(trait, ancestries, base_output_folder)
    if not validated_inputs:
        return None

    out_dir = f'{base_output_folder}/metal/{trait}'

    # METAL configuration
    env_dict = {
        'TRAIT': trait,
        'OUT_PREF': trait,
    }

    in_dict = {}
    file_list = []
    sample_sizes = []
    
    for i, (anc, file_info) in enumerate(validated_inputs.items(), 1):
        in_dict[f'INPUT_{anc.upper()}'] = file_info['path']
        file_list.append(file_info['path'])
        sample_sizes.append(str(file_info['n_total']))
    
    # Pass file list and sample sizes as comma-separated strings
    env_dict['INPUTS'] = ",".join(in_dict.keys())
    env_dict['SAMPLE_SIZES'] = ','.join(sample_sizes)  # n_total per ancestry
    env_dict['N_FILES'] = str(len(file_list))

    out_dirs = {
        'OUTPUT_PATH': out_dir
    }

    return dsub_script(
        label=f'metal_{trait}',
        machine_type='n2d-standard-8',
        envs=env_dict,
        in_params=in_dict,
        out_params=None,
        out_dirs=out_dirs,
        boot_disk=100,
        disk_size=150,
        image=f'{artifact_registry}/bwaxse/metal',
        script='run_metal.sh',
        preemptible=True
    )

## Run METAL

In [37]:
# Ancestry-specific null model
for trait in ['allergies_ehr']:
    _ = run_metal(
        trait=trait,
        ancestries=['eas', 'amr'], #'amr', 'afr', 'eur', 
        base_output_folder=output_folder
    )

Ancestry eas: 643 cases, 4491 controls (total: 5134)
Ancestry amr: 3634 cases, 37905 controls (total: 41539)
Validated 2 studies for allergies_ehr 



Job properties:
  job-id: metal-alle--bwaxse--xxxx-xxxxx
  job-name: metal-allergies-ehr
  user-id: bwaxse


metal-alle--bwaxse--xxxx-xxxxx


Provider internal-id (operation): projects/project/locations/us-central1/jobs/metal-alle--bwaxse--xxxx-xxxxx-0-0
Launched job-id: metal-alle--bwaxse--xxxx-xxxxx
To check the status, run:
  dstat --provider google-batch --project project --location us-central1 --jobs 'metal-alle--bwaxse--xxxx-xxxxx' --users 'bwaxse' --status '*'
To cancel the job, run:
  ddel --provider google-batch --project project --location us-central1 --jobs 'metal-alle--bwaxse--xxxx-xxxxx' --users 'bwaxse'


In [46]:
check_dsub_status(full=False)

Running: dstat --provider google-batch --user bwaxse --status '*' --project project --age 1d
Job Name         Status                                      Last Update
---------------  ------------------------------------------  --------------
metal-allerg...  Job state is set from RUNNING to SUCCEE...  08-26 19:03:40
s2-eur-7-all...  Job state is set from SCHEDULED to RUNN...  08-26 18:53:01



CompletedProcess(args="dstat --provider google-batch --user bwaxse --status '*' --project project --age 1d", returncode=0)

In [47]:
job_details(job='metal-alle--bwaxse--xxxx-xxxxx')

[output removed]

CompletedProcess(args='dstat --provider google-batch --project project --user bwaxse --status --jobs metal-alle--bwaxse--xxxx-xxxxx --full', returncode=0)

In [None]:
log="{my_bucket}/dsub/logs/metal-allergies-ehr/bwaxse/20250826/metal-alle--bwaxse--xxxx-xxxxx-task-None.log"
view_dsub_logs(log)

In [45]:
!gsutil ls {my_bucket}/data/saige_gwas/v0/metal/allergies_ehr/

{my_bucket}/data/saige_gwas/v0/metal/allergies_ehr/allergies_ehr1.tsv
{my_bucket}/data/saige_gwas/v0/metal/allergies_ehr/allergies_ehr1.tsv.info
{my_bucket}/data/saige_gwas/v0/metal/allergies_ehr/metal_script.txt


In [210]:
# cancel_running_jobs()

In [228]:
# """Cancel only running/pending jobs (safer)"""
# project = os.getenv("GOOGLE_PROJECT")

# # Cancel only running jobs
# cancel_cmd = f"ddel --provider google-batch --project {project} --users 'bwaxse' --jobs 'metal-alle--bwaxse--xxxxx-xxxxx'"
# print(f"Canceling running jobs: {cancel_cmd}")

# subprocess.run(cancel_cmd, shell=True, capture_output=False)