In [1]:
import pandas as pd
from scripts.utils import *


In [2]:
VCF_FILE = "data/sample_vcfs/sample.vcf"
VCF_COLNAMES  = ["chr", "pos", "id", "ref", "alt"]


G37_MIRNA_COORDS_FILE = "data/mirna_coordinates/grch37_coordinates.csv"
G37_FASTAS_DIR = "data/fasta/grch37"

INVALID_ROWS_FILE = "invalid.txt"

MIRNA_COORDS_DIR = "data/mirna_coordinates"


Unnamed: 0,chr,pos,id,ref,alt
0,1,24255616,PD4120aC2_1_24255616_A_T,A,T
1,1,24255696,nazoa_1_24255696_C_T,C,T
2,1,24255696,nazooa_1_24255696_A_T,A,T


In [4]:
import logging

def configure_pipeline_logging(log_file_path=None, log_level=logging.INFO):
    """Configure logging for the entire pipeline
    
    Args:
        log_file_path (str, optional): Path to log file. If None, logs only to console
        log_level (int, optional): Logging level. Defaults to logging.INFO
    """
    # Create logger
    logger = logging.getLogger('pipeline')
    logger.setLevel(log_level)
    
    # Create formatters
    console_formatter = logging.Formatter('%(asctime)s | %(levelname)8s | %(message)s')
    file_formatter = logging.Formatter('%(asctime)s | %(levelname)8s | %(name)s | %(message)s')
    
    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(console_formatter)
    console_handler.setLevel(logging.WARNING)  # Show only WARNING+ on console
    
    logger.addHandler(console_handler)
    
    # File handler if path provided
    if log_file_path:
        file_handler = logging.FileHandler(log_file_path)
        file_handler.setFormatter(file_formatter)
        logger.addHandler(file_handler)
    
    # Prevent logging from propagating to the root logger
    logger.propagate = False
    
    return logger

logger = configure_pipeline_logging(
    log_file_path='pipeline.log',
    log_level=logging.INFO
)

In [None]:
a few transformations inside pipeline will create secondary files. such as special cases or invalid rows. how do we handle them? these are the functions



In [5]:
from scripts.pipeline_steps.step1 import *

df = validate_ref_nucleotides(df, INVALID_ROWS_FILE )

2025-01-22 19:33:43,376 |     INFO | validated=2 invalid=1


In [None]:
def generate_is_mirna_column(df, grch):
    """
    Adds two columns to the input DataFrame:
    'is_mirna': 1 if the mutation falls within a miRNA region, 0 otherwise
    'mirna_accession': the accession number of the miRNA if 'is_mirna' is 1, None otherwise
    """
    logger = logging.getLogger('pipeline.mirna')
    
    # Load miRNA coordinates
    mirna_coords_file = os.path.join(MIRNA_COORDS_DIR, f"grch{grch}_coordinates.csv")
    coords = pd.read_csv(mirna_coords_file)
    logger.debug(f"Loaded {len(coords)} miRNA coordinates from GRCh{grch}")

    # Initialize new columns
    df['is_mirna'] = 0
    df['mirna_accession'] = None

    # Convert datatypes for both dataframes
    df['chr'] = df['chr'].astype(str)
    df['pos'] = df['pos'].astype(int)
    coords['chr'] = coords['chr'].astype(str)
    coords['start'] = coords['start'].astype(int)
    coords['end'] = coords['end'].astype(int)

    # Match mutations to miRNAs
    matched_count = 0
    for index, row in df.iterrows():
        mutation_chr = row['chr']
        mutation_start = row['pos']

        matching_rnas = coords.loc[(coords['chr'] == mutation_chr) &
                                 (coords['start'] <= mutation_start) &
                                 (coords['end'] >= mutation_start)]

        if not matching_rnas.empty:
            df.at[index, 'is_mirna'] = 1
            df.at[index, 'mirna_accession'] = matching_rnas['mirna_accession'].values[0]
            matched_count += 1

    logger.info(f"mirna_mutations={matched_count} on miRNA/{len(df)} total")
    return df



df = generate_is_mirna_column(df, 37)
df.head()


2025-01-22 19:34:23,736 |     INFO | mirna_mutations=1/2


Unnamed: 0,chr,pos,id,ref,alt,is_mirna,mirna_accession
0,1,24255616,PD4120aC2_1_24255616_A_T,A,T,1,MIMAT0018932
2,1,24255696,nazooa_1_24255696_A_T,A,T,0,


In [None]:
def add_sequence_columns(df):
    logger = logging.getLogger('pipeline.sequences')
    
    # Group by chromosome and position
    grouped = df.groupby(['chr', 'pos'])
    logger.debug(f"Processing {len(grouped)} unique positions")

    def apply_func(group):
        chr_val = group['chr'].iloc[0]
        pos_val = group['pos'].iloc[0]
        
        group['upstream_seq'] = get_upstream_sequence(
            chr_val, pos_val, NUCLEOTIDE_OFFSET)
        group['downstream_seq'] = get_downstream_sequence(
            chr_val, pos_val, group['ref'].iloc[0], NUCLEOTIDE_OFFSET)
        group['wt_seq'] = group['upstream_seq'] + \
            group['ref'] + group['downstream_seq']
        group['mut_seq'] = group['upstream_seq'] + \
            group['alt'] + group['downstream_seq']
        return group

    df = grouped.apply(apply_func)
    df = df.reset_index(drop=True)
    
    logger.info(f"added sequences for rows={len(df)} positions={len(grouped)}")
    return df


NUCLEOTIDE_OFFSET = 30

df = add_sequence_columns(df)
df.head()

  df = grouped.apply(apply_func)
2025-01-22 19:35:17,390 |     INFO | sequences=2 positions=2


Unnamed: 0,chr,pos,id,ref,alt,is_mirna,mirna_accession,upstream_seq,downstream_seq,wt_seq,mut_seq
0,1,24255616,PD4120aC2_1_24255616_A_T,A,T,1,MIMAT0018932,CAGGCTGCTAAACAACAGAACGAGCACTGG,CTTGGAGCCAGAAGTCTTGGGCTCAAGCCC,CAGGCTGCTAAACAACAGAACGAGCACTGGACTTGGAGCCAGAAGT...,CAGGCTGCTAAACAACAGAACGAGCACTGGTCTTGGAGCCAGAAGT...
1,1,24255696,nazooa_1_24255696_A_T,A,T,0,,CTGAATAATTCTGGGCAAATCAGTTGGACC,GTTACACCTCAGCCTTCTCATTTAGGAAAT,CTGAATAATTCTGGGCAAATCAGTTGGACCAGTTACACCTCAGCCT...,CTGAATAATTCTGGGCAAATCAGTTGGACCTGTTACACCTCAGCCT...


In [None]:
def classify_and_get_case_1_mutations(df, vcf_id, start, end, output_dir):
    """
    Classifies mutations into case 1 and case 2, saves case 2 mutations to disk,
    and returns the case 1 mutations.
    """
    logger = logging.getLogger('pipeline.classifier')
    
    # Classify case 1 and case 2 mutations
    case_1 = df[df.is_mirna == 0][["id", "wt_seq", "mut_seq"]]
    case_2 = df[df.is_mirna == 1][["id", "wt_seq", "mut_seq"]]
    
    if not case_2.empty:
        case_2_file = os.path.join(output_dir, f"{vcf_id}_{start}_{end}_case_2.csv")
        case_2.to_csv(case_2_file, index=False)
        logger.debug(f"Saved case 2 mutations to {case_2_file}")

    logger.info(f"classified mutations case1={len(case_1)} case2={len(case_2)}")
    return case_1


classify_and_get_case_1_mutations(df, )

In [8]:
from concurrent.futures import ThreadPoolExecutor, as_completed
WORKERS = 2

In [11]:
run_pipeline(VCF_FILE, 5, "results/test", "vcffff")

In [9]:
def run_pipeline(vcf_full_path: str, chunksize: int, output_dir: str, vcf_id: str):
    with ThreadPoolExecutor(max_workers=WORKERS) as executor:

        futures = []
        start_index = 0
        for chunk in pd.read_csv(vcf_full_path, chunksize=chunksize, sep="\t", header=None, names=["chr", "pos", "id", "ref", "alt"]):
            end_index = start_index + len(chunk) - 1
            future = executor.submit(
                process_chunk, chunk, start_index, end_index, output_dir, vcf_id)
            futures.append(future)
            start_index = end_index + 1

        for future in as_completed(futures):
            start_index, end_index = future.result()
            
            

def process_chunk(chunk: pd.DataFrame, start_index: int, end_index: int, output_dir: str, vcf_id: str) -> tuple:

    result = analysis_pipeline(
        chunk, start_index, end_index, output_dir, vcf_id)

    # Write the result to a CSV file in the output directory
    result_file = os.path.join(
        output_dir, f'result_{start_index}_{end_index}.csv')
    result.to_csv(result_file, index=False)

    return start_index, end_index




def analysis_pipeline(df: pd.DataFrame, start_index: int, end_index: int, output_dir: str, vcf_id: str, verbose: bool = False,) -> pd.DataFrame:

    df = pd.read_csv(VCF_FILE, sep="\t", header=None, names=VCF_COLNAMES)
    df["chr"] = df["chr"].astype(str)

    df['id'] = df['id'].astype(str)
    df['id'] += '_' + df['chr']+ '_' + df['pos'].astype(str) + '_' + df['ref'] + '_' + df['alt']

    # rest of the logic
    
    return df 