**Set environment**

In [1]:
source ../run_config_project.sh
show_env

BASE DIRECTORY (FD_BASE):      /hpc/group/igvf/kk319
REPO DIRECTORY (FD_REPO):      /hpc/group/igvf/kk319/repo
WORK DIRECTORY (FD_WORK):      /hpc/group/igvf/kk319/work
DATA DIRECTORY (FD_DATA):      /hpc/group/igvf/kk319/data
CONTAINER DIR. (FD_SING):      /hpc/group/igvf/kk319/container

You are working with           
PATH OF PROJECT (FD_PRJ):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR
PROJECT RESULTS (FD_RES):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/results
PROJECT SCRIPTS (FD_EXE):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/scripts
PROJECT DATA    (FD_DAT):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/data
PROJECT NOTE    (FD_NBK):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/notebooks
PROJECT DOCS    (FD_DOC):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/docs
PROJECT LOG     (FD_LOG):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/log
PROJECT REF     (FD_REF):      /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/references
PR

In [1]:
pwd

/hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/notebooks/analysis_variant_motif_richard


In [3]:
ls /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/scripts

config_path_duke_dcc_igvf.sh           [0m[01;32mrun_alphagenome_variant_prediction.sh[0m
config_project.py                      [01;32mrun_script.sh[0m
config_project.R                       run_setup_symlinks.sh
config_project.sh                      test_alphagenome_dnaclient.py
fun_utils.sh                           test_alphagenome_dnaclient.slurm
[01;34m__pycache__[0m                            [01;32mtest_script_cpu.sh[0m
[01;32mrun_alphagenome_variant_prediction.py[0m


In [4]:
cat /hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/scripts/run_script.sh

#!/bin/bash

#########################################
### Wrapper of project container

### container image
DIR="/hpc/group/igvf/kk319/container/project"
IMG="singularity_proj_igvf_bluestarr.sif"
APP="${DIR}/${IMG}"

### get command and arguments
### stored in an array to preserve each argument exactly as-is
CMD=("$@")

### execute the command
singularity exec \
    --env NUMBA_CACHE_DIR=/tmp \
    -B ${PWD} \
    -B /hpc:/hpc \
    -B /datacommons:/datacommons \
    ${APP} "${CMD[@]}"


In [14]:
${FP_APP} python - <<'EOF'
### set environment
import numpy as np
import os, time
import argparse
import statistics

from Bio import SeqIO
from concurrent.futures import ProcessPoolExecutor, as_completed

def one_hot_encode(txt_seq, txt_alphabet = "ACGT"):
    """
    one hot encoding of sequence (L,) to matrix (L,4)

    Parameters
    ----------
    txt_seq : str
        DNA sequence (string of A/C/G/T).
    txt_alphabet : str
        order of base for one-hot encoding

    Returns
    -------
    np.ndarray
        matrix of one-hot encoded sequence
    """
    ### init array for sequence matrix
    arr = np.zeros((len(txt_seq), len(txt_alphabet)), dtype=np.float32)
    
    ### set base-position map for encoding sequence
    dct = {base: idx for idx, base in enumerate(txt_alphabet)}

    ### iterate through sequence and perform one-hot encoding
    for idx, base in enumerate(txt_seq):
        if base in dct:
            arr[idx, dct[base]] = 1.0
    return arr

def decode_one_hot(X, txt_alphabet="ACGT"):
    """Helper function to turn one-hot encoded matrix back to seuqence"""
    return "".join(txt_alphabet[idx] for idx in X.argmax(axis=1))

def reverse_complement_matrix(arr_seq_Nx4, txt_alphabet = "ACGT"):
    """
    Reverse-complement a position×base matrix of shape (N,4).
    The input matrix reversed along positions and with columns 
    swapped A <-> T and C <-> G.

    Parameters
    ----------
    arr_seq_Nx4 : 
        one-hot enocded matrix of a sequence with shape (N, 4).

    Returns
    -------
    np.ndarray
        one-hot enocded matrix of the Reverse-complement sequence with shape (N, 4)
    """
    ### sanity check: alphabet must contain A, C, G, T
    required = set("ACGT")
    if not required.issubset(set(txt_alphabet)):
        raise ValueError(f"Alphabet must contain A,C,G,T. Got: {txt_alphabet}")
        
    ### map base -> column index
    dct_base_index = {base: idx for idx, base in enumerate(txt_alphabet)}
    
    ### define complement mapping
    dct_complement = {"A": "T", "C": "G", "G": "C", "T": "A"}
    
    ### build index list for complement columns, preserving input order
    lst_base_index_rc = [dct_base_index[dct_complement[base]] for base in txt_alphabet]

    ### flip positions + swap bases
    arr_seq_rc = arr_seq_Nx4[::-1, lst_base_index_rc]
    return arr_seq_rc

def scan_motif(txt_seq, arr_motif_Wx4):
    """
    Scan a motif across a sequence using sliding windows.

    Parameters
    ----------
    txt_seq : str
        DNA sequence (string of A/C/G/T).
    arr_motif_Wx4 : np.ndarray
        Motif log-odds matrix of shape (W,4).

    Returns
    -------
    np.ndarray
        Vector of scores, length = L - W + 1
    """
    # one hot encode sequence and get sequence/motif length
    X = one_hot_encode(txt_seq)  # shape (L,4)
    L = X.shape[0]
    W = arr_motif_Wx4.shape[0]

    if L < W:
        raise ValueError(f"Sequence length {L} shorter than motif length {W}")

    # build sliding windows: shape (L-W+1, W, 4)
    arr_windows = np.lib.stride_tricks.sliding_window_view(X, (W,4))
    arr_windows = arr_windows.reshape(-1, W, 4)

    # multiply each window with motif score and sum
    arr_scores = np.tensordot(arr_windows, arr_motif_Wx4, axes=([1,2],[0,1]))

    return arr_scores # shape (L-W+1,)

def scan_motif_both_strands(txt_seq, arr_motif_Wx4, txt_alphabet = "ACGT"):
    """
    Scan motif on both forward and reverse-complement strands.

    Returns
    -------
    tuple of np.ndarray
        (scores_forward, scores_reverse)
    """
    ### Forward motif scanning
    arr_score_fwd = scan_motif(txt_seq, arr_motif_Wx4)

    ### Reverse motif scanning by reverse complement motif
    arr_motif_rc   = reverse_complement_matrix(arr_motif_Wx4, txt_alphabet=txt_alphabet)
    arr_score_rev = scan_motif(txt_seq, arr_motif_rc)

    return arr_score_fwd, arr_score_rev

def scan_all_motifs_serial(txt_seq, dct_arr_motif_Wx4):
    """
    Scan all motifs on a single sequence (serial version).
    Returns dict[motif_name] -> {"forward": fwd_scores, "reverse": rev_scores}.
    """
    dct_results = {}
    for txt_motif_name, arr_motif_Wx4 in dct_arr_motif_Wx4.items():
        arr_scores_fwd, arr_scores_rev = scan_motif_both_strands(txt_seq, arr_motif_Wx4)
        dct_results[txt_motif_name] = {"forward": arr_scores_fwd, "reverse": arr_scores_rev}
    return dct_results

def scan_one_sequence(txt_seq, dct_arr_motif_Wx4):
    return scan_all_motifs_serial(txt_seq, dct_arr_motif_Wx4)

def scan_sequence_batch_serial(lst_seq_record, dct_arr_motif_Wx4):
    """
    Scan a batch of sequences (serial version)
    """
    ### init
    dct_results = {}

    ###
    for seq in lst_seq_record:
        dct_results[seq.id] = scan_one_sequence(str(seq.seq), dct_arr_motif_Wx4)
        
    return dct_results

def scan_sequence_batch_parallel():
    """
    Scan a batch of sequences (parallele version)
    """
    pass


def main(args):
    """Main function"""
    
    ### ======================================
    ### import data
    ### --------------------------------------
    lst_seq = list(SeqIO.parse(args.txt_fpath_fasta, "fasta"))
    
    obj = np.load(txt_fpath_motif, allow_pickle=True)
    dct_motif_lods = obj["lods"].item() # dict {name: (W,4)}
    arr_motif_bg   = obj["bg"]
    arr_motif_name = obj["names"]
    txt_alphabet   = obj["alphabet"]

    print("#{Motifs}: ", len(dct_motif_lods), "motifs")
    print("Alphabet:  ", txt_alphabet)
    print("Background:", arr_motif_bg)
    print("Motif name:")
    print(arr_motif_name[:6], "...")

    s = "ACGTT"                         # <-- not a palindrome under RC
    X = one_hot_encode(s)
    X_rc = reverse_complement_matrix(X, "ACGT")
    
    print("seq:   ", s)
    print("RC(seq)", "AACGT")           # expected by hand
    print("dec(X):", decode_one_hot(X))
    print("dec(RC):", decode_one_hot(X_rc))
    # sanity: decode_one_hot(X_rc) should be "AACGT"
    
    ### ======================================
    ### Benchmark: scan sequences (serial)
    ### --------------------------------------

    ### ======================================
    ### Benchmark: scan sequences (parallel)
    ### --------------------------------------

if __name__ == "__main__":

    ### get motif file directory
    txt_fdiry = "/hpc/group/igvf/kk319/data"
    txt_fdiry = os.path.join(txt_fdiry, "jaspar2024")
    
    txt_fname = "JASPAR2024_CORE_vertebrates_non-redundant.npz"
    txt_fpath = os.path.join(txt_fdiry, txt_fname)
    
    txt_fpath_motif = txt_fpath

    ### get fasta file directory
    txt_fdiry = "/hpc/group/igvf/kk319/repo/Proj_IGVF_BlueSTARR/results"
    txt_fdiry = os.path.join(txt_fdiry, "analysis_variant_motif_richard")
    txt_fdiry = os.path.join(txt_fdiry, "batches_dev")

    txt_fname = "variant_closed_gof_bluestarr_flank35_dev1000_ref.fa"
    txt_fpath = os.path.join(txt_fdiry, txt_fname)
    
    txt_fpath_fasta = txt_fpath

    ### parse arguments
    ### future: pass the argument via argparse
    #parser = argparse.ArgumentParser()
    #args = parser.parse_args()

    args = argparse.Namespace(
        txt_fpath_motif = txt_fpath_motif,
        txt_fpath_fasta = txt_fpath_fasta
    )
    
    ### Run main function
    main(args)
EOF

#{Motifs}:  879 motifs
Alphabet:   ACGT
Background: [0.25 0.25 0.25 0.25]
Motif name:
['MA0002.3 Runx1' 'MA0003.5 TFAP2A' 'MA0004.1 Arnt' 'MA0006.2 Ahr::Arnt'
 'MA0007.4 Ar' 'MA0009.2 TBXT'] ...
seq:    ACGTT
RC(seq) AACGT
dec(X): ACGTT
dec(RC): AACGT


In [None]:
run_memelite python - <<'EOF'
import numpy as np
import os, time
import statistics

from memelite.io    import read_meme
from memelite.utils import one_hot_encode

from concurrent.futures import ProcessPoolExecutor, as_completed

def pwm_to_logodds(arr_motif_pwm_4xW, bg=(0.25,0.25,0.25,0.25), eps=1e-6):
    """
    Convert PWM (4,W) to log-odds (W,4).

    Parameters
    ----------
    arr_motif_pwm_4xW : 
        Motif pwm matrix of shape (4,W).

    Returns
    -------
    np.ndarray
        Motif log-odds matrix of shape (W,4).
    """
    # Convert dtype and clip to avoid log2(0) = -inf
    arr_motif_pwm = arr_motif_pwm_4xW.astype(float)
    arr_motif_pwm = np.clip(arr_motif_pwm, eps, 1.0)

    # calculate the log-odds scores relative to background
    arr_motif_lod = np.log2(arr_motif_pwm / np.array(bg)[:,None])
    arr_motif_lod = arr_motif_lod.T # (4,W -> W,4)
    return arr_motif_lod # (W,4)

def reverse_complement_matrix(arr_seq_Nx4):
    """
    Reverse-complement any sequence matrix (N,4).
    Assumes columns order = [A, C, G, T].

    Parameters
    ----------
    arr_motif_Wx4 : 
        one-hot enocded matrix of a sequence with shape (N, 4).

    Returns
    -------
    np.ndarray
        one-hot enocded matrix of the Reverse-complement sequence with shape (N, 4)
    """
    lst_base_order = [3, 2, 1, 0]   # T,G,C,A
    arr_seq_rc = arr_seq_Nx4[::-1, lst_base_order] # flip positions + swap bases
    return arr_seq_rc

def scan_motif(txt_seq, arr_motif_Wx4):
    """
    Scan a motif across a sequence using sliding windows.

    Parameters
    ----------
    txt_seq : str
        DNA sequence (string of A/C/G/T).
    arr_motif_Wx4 : np.ndarray
        Motif log-odds matrix of shape (W,4).

    Returns
    -------
    np.ndarray
        Vector of scores, length = L - W + 1
    """
    # one hot encode sequence and get sequence/motif length
    X = one_hot_encode(txt_seq).T # (L,4)
    L, W = X.shape[0], arr_motif_Wx4.shape[0]

    if L < W:
        raise ValueError(f"Sequence length {L} shorter than motif length {W}")

    # build sliding windows: shape (L-W+1, W, 4)
    arr_windows = np.lib.stride_tricks.sliding_window_view(X, (W,4))
    arr_windows = arr_windows.reshape(-1, W, 4)

    # multiply each window with motif score and sum
    arr_scores = np.tensordot(arr_windows, arr_motif_Wx4, axes=([1,2],[0,1]))

    return arr_scores # shape (L-W+1,)

def scan_motif_both_strands(txt_seq, arr_motif_Wx4):
    """
    Scan motif on both forward and reverse-complement strands.

    Returns
    -------
    tuple of np.ndarray
        (scores_forward, scores_reverse)
    """
    # Forward motif scanning
    arr_score_fwd = scan_motif(txt_seq, arr_motif_Wx4)

    # Reverse motif scanning by reverse complement motif
    arr_motif_rc   = reverse_complement_matrix(arr_motif_Wx4)
    arr_score_rev = scan_motif(txt_seq, arr_motif_rc)

    return arr_score_fwd, arr_score_rev

def scan_all_motifs_serial(txt_seq, dct_arr_motif_pwm):
    """
    Scan all motifs on a single sequence (serial version).
    Returns dict[motif_name] -> {"forward": fwd_scores, "reverse": rev_scores}.
    """
    dct_results = {}
    for txt_motif_name, arr_motif_pwm_4xW in dct_arr_motif_pwm.items():
        arr_motif_lod_Wx4 = pwm_to_logodds(arr_motif_pwm_4xW)
        arr_scores_fwd, arr_scores_rev = scan_motif_both_strands(txt_seq, arr_motif_lod_Wx4)
        dct_results[txt_motif_name] = {"forward": arr_scores_fwd, "reverse": arr_scores_rev}
    return dct_results

def scan_one_motif(txt_motif_name, arr_motif_pwm_4xW, txt_seq):
    """Helper: scan one motif on one sequence."""
    arr_motif_lod_Wx4 = pwm_to_logodds(arr_motif_pwm_4xW)
    arr_scores_fwd, arr_scores_rev = scan_motif_both_strands(txt_seq, arr_motif_lod_Wx4)
    return txt_motif_name, arr_scores_fwd, arr_scores_rev

def scan_all_motifs_parallel(txt_seq, dct_arr_motif_pwm, max_workers=8):
    """
    Scan all motifs on a single sequence (parallel version).
    Returns dict[motif_name] -> {"forward": fwd_scores, "reverse": rev_scores}.
    """
    dct_results = {}
    with ProcessPoolExecutor(max_workers=max_workers) as ex:
        futures = [
            ex.submit(scan_one_motif, name, pwm, txt_seq)
            for name, pwm in dct_arr_motif_pwm.items()
        ]
        for f in as_completed(futures):
            name, fwd, rev = f.result()
            dct_results[name] = {"forward": fwd, "reverse": rev}
    return dct_results

def benchmark_scan(txt_seq, dct_arr_motif_pwm, num_runs=5, max_workers=8):
    # Serial
    times_serial = []
    for _ in range(num_runs):
        t0 = time.time()
        _ = scan_all_motifs_serial(txt_seq, dct_arr_motif_pwm)
        times_serial.append(time.time() - t0)

    # Parallel
    times_parallel = []
    for _ in range(num_runs):
        t0 = time.time()
        _ = scan_all_motifs_parallel(txt_seq, dct_arr_motif_pwm, max_workers=max_workers)
        times_parallel.append(time.time() - t0)

    print("---- Benchmark ----")
    print(f"Serial   mean={statistics.mean(times_serial):.3f}s  std={statistics.stdev(times_serial):.3f}")
    print(f"Parallel mean={statistics.mean(times_parallel):.3f}s  std={statistics.stdev(times_parallel):.3f} (workers={max_workers})")

# import motifs pwm
txt_fpath_motif   = "/hpc/group/igvf/kk319/data/jaspar2024/JASPAR2024_CORE_vertebrates_non-redundant.meme"
dct_arr_motif_pwm = read_meme(txt_fpath_motif)

# Example sequence
txt_seq = "ACGTACGTACGTACGTACGTACGTACGTACGTACGATGCATGCATGCATGCATGCATGCATGCATGCATGCAT"

# Benchmark scanning (Serial)
benchmark_scan(txt_seq, dct_arr_motif_pwm, num_runs=5, max_workers=8)
EOF