# Salmon Quantification Data Processing and Filtering Workflow
## Author: Felipe Villena, PhD
###  Date: 25-07-2024

**Purpose:**

This project provides functionality analogous to the tximport package in R, 
but implemented in Python. It efficiently imports and processes data from 
Salmon Quantification files, consolidating multiple files into a single table.

The script can extract various types of data, including **Transcripts Per Million (TPM)**
and **Read Counts (NumReads)** , among others.

**Key Features:**

1. Parallel processing of multiple Salmon quantification files
2. Flexible data extraction (e.g., TPM, NumReads, or other columns from Salmon output)
3. Consolidation of multiple Salmon output files into a single comprehensive table
4. Efficient batch processing and merging of large datasets
5. Custom filtering options for both rows (genes) and columns (samples)

**Usage:**

Modify the 'columns_to_extract' variable to specify which data type(s) to extract 
(e.g., ['TPM'], ['NumReads'], or ['TPM', 'NumReads'] for both).
Adjust input and output file paths as needed.


# 1 - Import libraries

In [3]:
import yaml
import os
import pandas as pd
import re
from tqdm import tqdm
import csv
import multiprocessing as mp

# 2 - Function Definitions

### process_file(file, columns_to_extract)
This function processes a single Salmon quantification file. It extracts specified columns (e.g., TPM, NumReads) and renames them with the sample identifier.

### process_batch(batch_files, output_file, batch_num, columns_to_extract)
This function processes a batch of Salmon quantification files. It's designed to work with the multiprocessing pool for parallel processing.

### merge_batch_files(temp_files, output_file)
This function merges the processed batch files into a single comprehensive table. It handles the consolidation of data from multiple Salmon output files.

### main(input_folder, output_file, columns_to_extract, batch_size=100)

The main function orchestrates the entire workflow. It's structured to:

1. Locate and list all Salmon quantification files
2. Set up parallel processing using Python's multiprocessing
3. Process files in batches
4. Merge results into a single output file

This structure allows for efficient processing of large datasets by:
- Utilizing parallel processing to speed up file reading and initial processing
- Processing files in batches to manage memory usage
- Consolidating results incrementally to handle large numbers of samples

### filter_dataset(input_file, output_file, row_condition=None, column_condition=None)
This function applies filtering to the consolidated dataset. It can filter both rows (e.g., genes) and columns (e.g., samples) based on specified conditions.

In [4]:

def load_config(config_file):
    """Load configuration from YAML file."""
    with open(config_file, 'r') as file:
        return yaml.safe_load(file)

def process_file(file, columns_to_extract, sample_id_pattern):
    """
    Process a single Salmon quantification file.
    
    Args:
    file (str): Path to the Salmon quantification file.
    columns_to_extract (list): List of column names to extract from the file.
    sample_id_pattern (str): Regular expression pattern to extract sample ID from filename.
    
    Returns:
    pd.DataFrame: Processed data from the file.
    """
    try:
        sample_name_match = re.search(sample_id_pattern, file)
        sample_name = sample_name_match.group('sample_id') if sample_name_match else "unknown"
        
        usecols = ['Name'] + columns_to_extract
        sample_data = pd.read_csv(file, sep='\t', usecols=usecols)
        
        sample_data = sample_data.rename(columns={col: f'{sample_name}' for col in columns_to_extract})
        sample_data = sample_data.set_index('Name')
        
        return sample_data
    except Exception as e:
        print(f"Error processing file {file}: {str(e)}")
        return None

def process_batch(batch_files, output_file, batch_num, columns_to_extract, sample_id_pattern):
    """
    Process a batch of Salmon quantification files.
    
    Args:
    batch_files (list): List of file paths to process.
    output_file (str): Base name for the output file.
    batch_num (int): Batch number for identification.
    columns_to_extract (list): List of column names to extract from each file.
    sample_id_pattern (str): Regular expression pattern to extract sample ID from filename.
    
    Returns:
    str: Path to the temporary batch file.
    """
    print(f"Processing batch {batch_num} with {len(batch_files)} files")
    batch_data = [process_file(file, columns_to_extract, sample_id_pattern) for file in batch_files]
    batch_data = [data for data in batch_data if data is not None]
    
    if not batch_data:
        print(f"Warning: Batch {batch_num} has no valid data.")
        return None
    
    combined_data = pd.concat(batch_data, axis=1)
    
    temp_file = f"{output_file}.batch{batch_num}"
    combined_data.to_csv(temp_file, sep='\t')
    
    print(f"Batch {batch_num} processed successfully. Shape: {combined_data.shape}")
    return temp_file

def merge_batch_files(temp_files, output_file):
    """
    Merge processed batch files into a single output file.
    
    Args:
    temp_files (list): List of temporary batch file paths.
    output_file (str): Path for the final output file.
    """
    print(f"Merging {len(temp_files)} batch files...")
    all_data = []
    for i, temp_file in enumerate(tqdm(temp_files, desc="Reading batch files")):
        try:
            if not os.path.exists(temp_file):
                print(f"Warning: Temp file {temp_file} does not exist.")
                continue
            
            batch_data = pd.read_csv(temp_file, sep='\t', index_col='Name')
            print(f"Batch {i} shape: {batch_data.shape}")
            
            if batch_data.empty:
                print(f"Warning: Batch {i} is empty.")
                continue
            
            all_data.append(batch_data)
        except Exception as e:
            print(f"Error processing {temp_file}: {str(e)}")
        finally:
            if os.path.exists(temp_file):
                os.remove(temp_file)
    
    if not all_data:
        raise ValueError("No data was successfully read from temporary files.")
    
    print("Concatenating all data...")
    final_data = pd.concat(all_data, axis=1)
    print(f"Final data shape: {final_data.shape}")
    
    print("Writing final output...")
    final_data.to_csv(output_file, sep='\t')

def main(input_folder, output_file, columns_to_extract, batch_size, sample_id_pattern):
    """
    Main function to process Salmon quantification files.
    
    Args:
    input_folder (str): Path to the folder containing Salmon quantification files.
    output_file (str): Path for the final output file.
    columns_to_extract (list): List of column names to extract from each file.
    batch_size (int): Number of files to process in each batch.
    sample_id_pattern (str): Regular expression pattern to extract sample ID from filename.
    """
    print(f"Current working directory: {os.getcwd()}")
    print(f"Looking for files in: {os.path.abspath(input_folder)}")

    if not os.path.exists(input_folder):
        print(f"Error: The folder '{input_folder}' does not exist in the current directory.")
        print("Contents of current directory:")
        print(os.listdir())
        return

    files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.sf')]
    print(f"Total .sf files found: {len(files)}")

    if not files:
        print("Error: No files found to process. Please check the file extension and directory.")
        return

    num_cores = mp.cpu_count() - 1
    batches = [files[i:i+batch_size] for i in range(0, len(files), batch_size)]

    pool = mp.Pool(num_cores)

    results = []
    with tqdm(total=len(batches), desc="Processing batches") as pbar:
        for result in pool.starmap(process_batch, [(batch, output_file, i, columns_to_extract, sample_id_pattern) for i, batch in enumerate(batches)]):
            if result is not None:
                results.append(result)
            else:
                print(f"Batch {len(results)} failed to process")
            pbar.update()

    pool.close()
    pool.join()

    if not results:
        print("Error: No batches were successfully processed. Check the logs above for details.")
    else:
        print(f"Successfully processed {len(results)} batches out of {len(batches)}")
        merge_batch_files(results, output_file)

    print(f"Processing complete. Output written to {output_file}")

def filter_dataset(input_file, output_file, row_condition=None, column_condition=None):
    """
    Filter the processed dataset based on row and column conditions.
    
    Args:
    input_file (str): Path to the input file to filter.
    output_file (str): Path for the filtered output file.
    row_condition (list or callable): Condition to filter rows.
    column_condition (list or callable): Condition to filter columns.
    """
    print(f"Loading data from {input_file}")
    df = pd.read_csv(input_file, sep='\t', index_col='Name')
    
    original_shape = df.shape
    print(f"Original dataset shape: {original_shape}")

    if row_condition is not None:
        if callable(row_condition):
            df = df.loc[df.index.map(row_condition)]
        else:
            df = df.loc[df.index.isin(row_condition)]
    
    if column_condition is not None:
        if callable(column_condition):
            df = df.loc[:, df.columns.map(column_condition)]
        else:
            df = df.loc[:, df.columns.isin(column_condition)]
    
    print(f"Filtered dataset shape: {df.shape}")
    
    df.to_csv(output_file, sep='\t')
    print(f"Filtered data saved to {output_file}")

# 3 - Main Execution

## Filtering the Consolidated Dataset

After consolidating the Salmon quantification data, we often need to filter the dataset for specific analyses. In RNA-seq studies, it's common to filter both rows (genes) and columns (samples). Let's break down our filtering process:

### Row Filtering (Genes)
We filter rows to focus on genes of interest, often based on prior analyses or biological significance.

In this example, we're using results from a meta-analysis of RNA-seq data in Parkinson's Disease:

- We load a file 'Meta-analysis-RNAseq-PD-v2.csv' which contains genes ranked by their importance in PD.
- We select the top 151 genes based on 'Feature_Importance_1'.
- These genes likely represent those most relevant to PD based on the meta-analysis.

### Column Filtering (Samples)
We filter columns to select specific samples for our analysis, often based on clinical or quality control criteria.

In this example:

- We load a file 'PatientsSelected.tsv' which contains information about patients/samples.
- We use the 'HudAlphaID' column to select specific samples.
- This could represent a subset of patients meeting certain clinical criteria, quality thresholds, or belonging to a specific study group.

### Applying the Filters

The `filter_dataset` function applies these filters to our consolidated data:

- Row condition (Feat_1): Keeps only the top 151 genes from our meta-analysis.
- Column condition (Patients_filter): Keeps only the samples specified in our patient selection file.

This filtered dataset (PPMI_PD_Counts_Feature1.tsv) will contain expression data for our genes of interest across the selected patient samples, ready for downstream analysis.

In [5]:
if __name__ == '__main__':
    # 1 -Load configuration
    config = load_config('config.yaml')

    # 2 - Process Salmon quantification files
    main(config['input_folder'], 
         config['output_file'], 
         config['columns_to_extract'], 
         config['batch_size'],
         config['sample_id_pattern'])

    # 3 - Load filters
    Patients = pd.read_csv(config['patients_file'], sep='\t')
    Patients_filter = Patients[config['patients_id_column']]

    Meta_Analysis = pd.read_csv(config['meta_analysis_file'])
    Feat_1 = Meta_Analysis.sort_values(config['feature_importance_column'], ascending=False)['Gene'][:config['top_genes_count']].values

    # 4 - Apply filters
    filter_dataset(input_file=config['output_file'], 
                   output_file=config['filtered_output_file'], 
                   row_condition=Feat_1, 
                   column_condition=Patients_filter)

    print(f"Processing complete. Filtered output written to {config['filtered_output_file']}")

    # Optional: Display summary of filtered data
    filtered_data = pd.read_csv(config['filtered_output_file'], sep='\t', index_col='Name')
    print(f"\nFiltered dataset shape: {filtered_data.shape}")
    print(f"Number of genes: {filtered_data.shape[0]}")
    print(f"Number of samples: {filtered_data.shape[1]}")

Current working directory: /home/jovyan/mounted_data
Looking for files in: /home/jovyan/mounted_data/quant_salmon_genes
Total .sf files found: 4756


Processing batches:   0%|          | 0/48 [00:00<?, ?it/s]

Processing batch 4 with 100 filesProcessing batch 2 with 100 filesProcessing batch 0 with 100 filesProcessing batch 6 with 100 filesProcessing batch 12 with 100 files

Processing batch 8 with 100 files

Processing batch 10 with 100 files


Batch 6 processed successfully. Shape: (58294, 100)
Batch 8 processed successfully. Shape: (58294, 100)
Batch 0 processed successfully. Shape: (58294, 100)
Processing batch 7 with 100 files
Batch 12 processed successfully. Shape: (58294, 100)
Processing batch 9 with 100 files
Processing batch 1 with 100 files
Processing batch 13 with 100 files
Batch 10 processed successfully. Shape: (58294, 100)
Processing batch 11 with 100 files
Batch 4 processed successfully. Shape: (58294, 100)
Batch 2 processed successfully. Shape: (58294, 100)
Processing batch 5 with 100 files
Processing batch 3 with 100 files
Batch 7 processed successfully. Shape: (58294, 100)
Batch 13 processed successfully. Shape: (58294, 100)
Processing batch 14 with 100 files
Processing bat

Processing batches: 100%|██████████| 48/48 [00:53<00:00,  1.11s/it]


Successfully processed 48 batches out of 48
Merging 48 batch files...


Reading batch files:   2%|▏         | 1/48 [00:00<00:12,  3.62it/s]

Batch 0 shape: (58294, 100)


Reading batch files:   4%|▍         | 2/48 [00:00<00:12,  3.77it/s]

Batch 1 shape: (58294, 100)


Reading batch files:   6%|▋         | 3/48 [00:00<00:11,  3.81it/s]

Batch 2 shape: (58294, 100)


Reading batch files:   8%|▊         | 4/48 [00:01<00:11,  3.79it/s]

Batch 3 shape: (58294, 100)


Reading batch files:  10%|█         | 5/48 [00:01<00:11,  3.77it/s]

Batch 4 shape: (58294, 100)


Reading batch files:  12%|█▎        | 6/48 [00:01<00:11,  3.77it/s]

Batch 5 shape: (58294, 100)


Reading batch files:  15%|█▍        | 7/48 [00:01<00:10,  3.73it/s]

Batch 6 shape: (58294, 100)


Reading batch files:  17%|█▋        | 8/48 [00:02<00:10,  3.74it/s]

Batch 7 shape: (58294, 100)


Reading batch files:  19%|█▉        | 9/48 [00:02<00:10,  3.67it/s]

Batch 8 shape: (58294, 100)


Reading batch files:  21%|██        | 10/48 [00:02<00:10,  3.63it/s]

Batch 9 shape: (58294, 100)


Reading batch files:  23%|██▎       | 11/48 [00:02<00:10,  3.66it/s]

Batch 10 shape: (58294, 100)


Reading batch files:  25%|██▌       | 12/48 [00:03<00:09,  3.70it/s]

Batch 11 shape: (58294, 100)


Reading batch files:  27%|██▋       | 13/48 [00:03<00:09,  3.71it/s]

Batch 12 shape: (58294, 100)


Reading batch files:  29%|██▉       | 14/48 [00:03<00:09,  3.72it/s]

Batch 13 shape: (58294, 100)


Reading batch files:  31%|███▏      | 15/48 [00:04<00:08,  3.73it/s]

Batch 14 shape: (58294, 100)


Reading batch files:  33%|███▎      | 16/48 [00:04<00:08,  3.76it/s]

Batch 15 shape: (58294, 100)


Reading batch files:  35%|███▌      | 17/48 [00:04<00:08,  3.77it/s]

Batch 16 shape: (58294, 100)


Reading batch files:  38%|███▊      | 18/48 [00:04<00:07,  3.78it/s]

Batch 17 shape: (58294, 100)


Reading batch files:  40%|███▉      | 19/48 [00:05<00:07,  3.74it/s]

Batch 18 shape: (58294, 100)


Reading batch files:  42%|████▏     | 20/48 [00:05<00:07,  3.77it/s]

Batch 19 shape: (58294, 100)


Reading batch files:  44%|████▍     | 21/48 [00:05<00:07,  3.75it/s]

Batch 20 shape: (58294, 100)


Reading batch files:  46%|████▌     | 22/48 [00:05<00:06,  3.73it/s]

Batch 21 shape: (58294, 100)


Reading batch files:  48%|████▊     | 23/48 [00:06<00:06,  3.61it/s]

Batch 22 shape: (58294, 100)


Reading batch files:  50%|█████     | 24/48 [00:06<00:06,  3.59it/s]

Batch 23 shape: (58294, 100)


Reading batch files:  52%|█████▏    | 25/48 [00:06<00:06,  3.58it/s]

Batch 24 shape: (58294, 100)


Reading batch files:  54%|█████▍    | 26/48 [00:07<00:06,  3.65it/s]

Batch 25 shape: (58294, 100)


Reading batch files:  56%|█████▋    | 27/48 [00:07<00:05,  3.67it/s]

Batch 26 shape: (58294, 100)


Reading batch files:  58%|█████▊    | 28/48 [00:07<00:05,  3.74it/s]

Batch 27 shape: (58294, 100)


Reading batch files:  60%|██████    | 29/48 [00:07<00:05,  3.74it/s]

Batch 28 shape: (58294, 100)


Reading batch files:  62%|██████▎   | 30/48 [00:08<00:04,  3.76it/s]

Batch 29 shape: (58294, 100)


Reading batch files:  65%|██████▍   | 31/48 [00:08<00:04,  3.77it/s]

Batch 30 shape: (58294, 100)


Reading batch files:  67%|██████▋   | 32/48 [00:08<00:04,  3.78it/s]

Batch 31 shape: (58294, 100)


Reading batch files:  69%|██████▉   | 33/48 [00:08<00:03,  3.78it/s]

Batch 32 shape: (58294, 100)


Reading batch files:  71%|███████   | 34/48 [00:09<00:03,  3.78it/s]

Batch 33 shape: (58294, 100)


Reading batch files:  73%|███████▎  | 35/48 [00:09<00:03,  3.77it/s]

Batch 34 shape: (58294, 100)


Reading batch files:  75%|███████▌  | 36/48 [00:09<00:03,  3.80it/s]

Batch 35 shape: (58294, 100)


Reading batch files:  77%|███████▋  | 37/48 [00:09<00:02,  3.83it/s]

Batch 36 shape: (58294, 100)


Reading batch files:  79%|███████▉  | 38/48 [00:10<00:02,  3.80it/s]

Batch 37 shape: (58294, 100)


Reading batch files:  81%|████████▏ | 39/48 [00:10<00:02,  3.74it/s]

Batch 38 shape: (58294, 100)


Reading batch files:  83%|████████▎ | 40/48 [00:10<00:02,  3.68it/s]

Batch 39 shape: (58294, 100)


Reading batch files:  85%|████████▌ | 41/48 [00:11<00:01,  3.67it/s]

Batch 40 shape: (58294, 100)


Reading batch files:  88%|████████▊ | 42/48 [00:11<00:01,  3.69it/s]

Batch 41 shape: (58294, 100)


Reading batch files:  90%|████████▉ | 43/48 [00:11<00:01,  3.74it/s]

Batch 42 shape: (58294, 100)


Reading batch files:  92%|█████████▏| 44/48 [00:11<00:01,  3.75it/s]

Batch 43 shape: (58294, 100)


Reading batch files:  94%|█████████▍| 45/48 [00:12<00:00,  3.78it/s]

Batch 44 shape: (58294, 100)


Reading batch files:  96%|█████████▌| 46/48 [00:12<00:00,  3.80it/s]

Batch 45 shape: (58294, 100)


Reading batch files: 100%|██████████| 48/48 [00:12<00:00,  3.77it/s]

Batch 46 shape: (58294, 100)
Batch 47 shape: (58294, 56)
Concatenating all data...





Final data shape: (58294, 4756)
Writing final output...
Processing complete. Output written to PPMI_PD_Counts_24072024.tsv
Loading data from PPMI_PD_Counts_24072024.tsv
Original dataset shape: (58294, 4756)
Filtered dataset shape: (150, 1768)
Filtered data saved to PPMI_PD_Counts_Feature1.tsv
Processing complete. Filtered output written to PPMI_PD_Counts_Feature1.tsv

Filtered dataset shape: (150, 1768)
Number of genes: 150
Number of samples: 1768


# 4 - Conclusion

This notebook demonstrates a efficient method for consolidating and processing Salmon quantification data using Python. The workflow is designed to handle RNA-seq datasets.

Key advantages of this approach include:

- Parallel processing for improved performance
- Batch processing to manage memory usage with large datasets
- Flexibility in data extraction (TPM, NumReads, or other Salmon output columns)
- Custom filtering capabilities for downstream analysis