In [48]:
import yaml
import pandas as pd
from typing import Generator, Tuple, Dict
import multiprocessing
from typing import Optional

In [5]:
def read_fasta(filepath: str) -> Generator[Tuple[str, str], None, None]:
    """
    Parses a FASTA file and yields record names and sequences.
    Args:
        filepath (str): Path to the FASTA file.
    Yields:
        tuple: A tuple containing the record name and sequence.
    """
    try:
        with open(filepath, 'r') as file:
            name, sequence = None, []
            for line in file:
                line = line.strip()
                if line.startswith(">"):
                    if name:
                        yield name, ''.join(sequence)
                    name, sequence = line[1:], []
                else:
                    sequence.append(line)
            if name:
                yield name, ''.join(sequence)
    except FileNotFoundError:
        raise FileNotFoundError(f"File not found: {filepath}")
    except Exception as e:
        raise RuntimeError(f"An error occurred while reading the FASTA file: {e}")


In [12]:
filepath = "tests/test_fasta.fa"

In [15]:
for _,seq in read_fasta(filepath):
    print(seq)

ATGCGTACGTAGCTAGCGTAGCTAGT
CGTAGCTAGTACGATCGTACGTAGCT


In [58]:

def load_feature_data(prop: str, feature_file: str="data/lookup.yaml") -> Dict[str, float]:
    """
    Loads feature data from a YAML file for a given k-mer length.

    Args:
        feature_file (str): Path to the YAML file containing feature data.
        kmer_len (int): Length of the k-mer.

    Returns:
        dict: A dictionary of feature data.
    """
    try:
        with open(feature_file, 'r') as f:
            data = yaml.safe_load(f)
            if str(prop) in data:
                return data[str(prop)]
            else:
                raise ValueError(f"k-mer length {kmer_len} not found in feature data.")
    except FileNotFoundError:
        raise FileNotFoundError(f"Feature file not found: {feature_file}")
    except yaml.YAMLError as e:
        raise RuntimeError(f"An error occurred while parsing the YAML file: {e}")
    except Exception as e:
        raise RuntimeError(f"An error occurred while loading feature data: {e}")


In [59]:
feature_lookup = load_feature_data(prop = "trinucleotide", 
                  feature_file="DNAflexpy/data/lookup.yaml")
feature_lookup

{'NPP': {'AAA': 36,
  'AAC': 6,
  'AAG': 6,
  'AAT': 30,
  'ACA': 6,
  'ACC': 8,
  'ACG': 8,
  'ACT': 11,
  'AGA': 9,
  'AGC': 25,
  'AGG': 8,
  'AGT': 11,
  'ATA': 13,
  'ATC': 7,
  'ATG': 18,
  'ATT': 30,
  'CAA': 9,
  'CAC': 17,
  'CAG': 2,
  'CAT': 18,
  'CCA': 8,
  'CCC': 13,
  'CCG': 2,
  'CCT': 8,
  'CGA': 31,
  'CGC': 25,
  'CGG': 2,
  'CGT': 8,
  'CTA': 18,
  'CTC': 8,
  'CTG': 2,
  'CTT': 6,
  'GAA': 12,
  'GAC': 8,
  'GAG': 8,
  'GAT': 7,
  'GCA': 13,
  'GCC': 45,
  'GCG': 25,
  'GCT': 25,
  'GGA': 5,
  'GGC': 45,
  'GGG': 13,
  'GGT': 8,
  'GTA': 6,
  'GTC': 8,
  'GTG': 17,
  'GTT': 6,
  'TAA': 20,
  'TAC': 6,
  'TAG': 18,
  'TAT': 13,
  'TCA': 8,
  'TCC': 5,
  'TCG': 31,
  'TCT': 9,
  'TGA': 8,
  'TGC': 13,
  'TGG': 8,
  'TGT': 6,
  'TTA': 20,
  'TTC': 12,
  'TTG': 9,
  'TTT': 36},
 'DNaseI': {'AAT': -0.28,
  'ATT': -0.28,
  'AAA': -0.274,
  'TTT': -0.274,
  'CCA': -0.246,
  'TGG': -0.246,
  'AAC': -0.205,
  'GTT': -0.205,
  'CCG': -0.136,
  'CGG': -0.136,
  'ATC': -0.11,


In [25]:
def transform_seq_to_feat(sequence, kmer_len, feature, feature_lookup):
    """
    Calculates average structural profile values for a given sequence window.

    Args:
        sequence (str): DNA sequence to process.
        kmer_len (int): Length of k-mers.
        feature (str): Feature to calculate.
        feature_lookup (dict): Dictionary containing feature data.

    Returns:
        float: Average feature value for the given sequence window.
    """
    sequence = sequence.upper()
    feature_data = feature_lookup.get(feature, {})

    if not feature_data:
        print(f"**FATAL** ==>{feature}<== not in lookup data (.yaml file). Check spelling or add it.")
        return 0

    ls_values_w = []
    for i in range(len(sequence) - kmer_len + 1):
        subseq = sequence[i: i + kmer_len]
        value = feature_data.get(subseq)
        if value is not None:
            ls_values_w.append(value)
        else:
            print(f"**Warning** Subsequence {subseq} not found in the dictionary for {feature}")

    avg_w = sum(ls_values_w) / (len(sequence) - 1) if ls_values_w else 0
    return avg_w


In [27]:
transform_seq_to_feat(sequence="AATGCCATTAGATTAG", kmer_len=3, feature="DNaseI", feature_lookup=feature_lookup)

-0.026800000000000004

In [60]:
def seq_to_numeric_profile(sequence, 
                        window_size, 
                        kmer_len, 
                        feature, 
                        feature_lookup):
    """
        Desc:
            - Operates on a sequence to aggregate average feature value of all the overlapping window

        arguments:
            - sequence
            - window size to make overlapping window
            - k-mer length
            - feature: Feature to calculate
            - feature_lookup: Dictionary containing feature data
            
        returns:
            - a list of values for a sequence

    """
    ls_window_avg_for_seq = []
    for w_start in range(len(sequence) - window_size + 1):
        current_w_seq = sequence[w_start : w_start + window_size]
        avg_w = transform_seq_to_feat(current_w_seq, kmer_len, feature, feature_lookup)
        ls_window_avg_for_seq.append(round(avg_w, 3))
    
    return ls_window_avg_for_seq

def process_sequence(record, 
                    window_size, 
                    prop, 
                    feature, 
                    feature_lookup):
    """
        Takes a sequence, 
        window size,
        kmer length,
        feature,
        feature_lookup
        
    """
    current_seq = record.seq
    feature_value = seq_to_numeric_profile(current_seq, window_size, kmer_len, feature, feature_lookup)
    return feature_value


In [40]:
seq_to_numeric_profile(sequence="AATGCCATTAGATTAG", 
                        window_size=5, 
                        kmer_len=3, 
                        feature='DNaseI', 
                        feature_lookup=feature_lookup)

[-0.018,
 0.079,
 -0.016,
 -0.001,
 -0.098,
 -0.02,
 -0.031,
 0.046,
 0.002,
 -0.091,
 -0.081,
 -0.031]

In [85]:
def process_sequence(record, 
                    window_size, 
                    kmer_len, 
                    feature, 
                    feature_lookup):
    """
        Takes a sequence, 
        window size,
        kmer length,
        feature,
        feature_lookup
        
    """
    feature_value = seq_to_numeric_profile(record, window_size, kmer_len, feature, feature_lookup)
    return feature_value


In [46]:
for _,seq in read_fasta(filepath):
    t = process_sequence(record = seq, window_size=5, kmer_len=3, feature='DNaseI', feature_lookup= feature_lookup)
    print(t)

[0.033, -0.009, -0.021, 0.004, 0.004, -0.01, -0.01, 0.02, 0.033, 0.031, 0.031, 0.049, 0.049, 0.007, -0.023, -0.021, 0.02, 0.033, 0.031, 0.031, 0.049, -0.001]
[0.02, 0.033, 0.031, 0.031, 0.049, -0.001, -0.017, -0.033, 0.004, -0.003, -0.036, -0.056, -0.056, -0.036, -0.003, 0.004, 0.004, -0.01, -0.01, 0.02, 0.033, 0.031]


In [82]:
def calculate_bendability(input_file: str, window_size: int, prop: str, kmer_len: int, feature: str, feature_file: str, threads: int, outfile: Optional[str] = None) -> Optional[pd.DataFrame]:
    """
    Main function to calculate feature profiles from a FASTA file.

    Args:
        input_file (str): Path to the input multifasta file.
        window_size (int): Size of the processing window.
        prop (int): Dinucleotide/Trinucleotide.
        feature (str): Feature to calculate.
        feature_file (str): Path to the YAML file containing feature data.
        threads (int): Number of threads to use.
        outfile (str, optional): Output file name.

    Returns:
        Optional[pd.DataFrame]: Feature profiles if outfile is not specified, otherwise None.
    """
    try:
        # Load feature data
        feature_lookup = load_feature_data(prop, feature_file)
        
        # Parse the fasta file and process sequences
#         records = list(read_fasta(input_file))
        with multiprocessing.Pool(processes=threads) as pool:
            results = pool.starmap(process_sequence, [(seq, window_size, kmer_len, feature, feature_lookup) for _, seq in read_fasta(input_file)])

        # Create DataFrame from results
        df = pd.DataFrame(results)
        
        if outfile:
            df.to_csv(outfile, index=False, header=False, sep="\t")
            logging.info("Process completed! Output written to %s", outfile)
        else:
            return df  # Return DataFrame if no output file is specified
    except Exception as e:
        logging.error("An error occurred: %s", e)
        raise e

In [86]:
import logging
test = calculate_bendability(input_file= "tests/test_fasta.fa", 
                   window_size = 5,
                   prop = "trinucleotide",
                   kmer_len= 3, 
                   feature = 'DNaseI',
                   feature_file = 'DNAflexpy/data/lookup.yaml',
                   threads = 2)

Process SpawnPoolWorker-19:
Process SpawnPoolWorker-20:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/upalabdha/mambaforge/envs/test_DNAflexpy/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/Users/upalabdha/mambaforge/envs/test_DNAflexpy/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/upalabdha/mambaforge/envs/test_DNAflexpy/lib/python3.13/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/upalabdha/mambaforge/envs/test_DNAflexpy/lib/python3.13/multiprocessing/queues.py", line 387, in get
    return _ForkingPickler.loads(res)
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get attribute 'process_sequence' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
  File "/Users/upalabdha/mambaforge/envs/test_DNAflexpy/lib/p

KeyboardInterrupt: 

In [None]:
!ls