# Isomatrix Tools

> A suite of utilities designed for handling isomatrices, which are generated by scisclor. These tools facilitate tasks such as converting isomatrices to AnnData format, merging multiple isomatrices, and more. Additionally, the suite includes features for downloading or generating isomatrix data, which can be particularly useful for testing and demonstrating new features.


In [1]:
#| default_exp isomatool

In [2]:
#| hide
from nbdev.showdoc import *

The Isomatrix Tools module provides a comprehensive set of utilities for working with isomatrices, particularly those generated by the scisclor tool. One of the key functionalities of this module is the conversion of isomatrix text files, which are the output of scisclor, into AnnData objects that are compatible with the Scanpy library. This conversion process is crucial for enabling downstream analysis of single-cell long-read data within the Scanpy ecosystem, allowing users to leverage its powerful analytical capabilities. The module ensures that the conversion retains all necessary gene and transcript information, and it supports the creation of both dense and sparse matrix representations to cater to different computational needs.


In [3]:
#| hide
#| export
import pandas as pd
from scipy.sparse import csr_matrix
import scanpy as sc
import warnings
import anndata as ad
from anndata import AnnData
import urllib.request
import gzip
import shutil
import os
import time
from multiprocessing import Pool
import os
from functools import partial



In [4]:
#| export

def isomatrix_to_anndata(file_path:str,  # The path to the isomatrix csv file to be read.
                        sparse:bool=False  # Flag to determine if the output should be a sparse matrix.
) -> AnnData: # The converted isomatrix as a scanpy compatible anndata object
    """
    This function converts an isomatrix txt file (SiCeLoRe output) into an AnnData object compatible with scanpy

    """
    
    # Read in the data from the file
    df = pd.read_csv(file_path, sep='\t', index_col=0)
    # Filter out rows where the transcriptId is "undef"
    df = df.loc[df["transcriptId"] != "undef"]
    
    df = df.reset_index()
    df = df.transpose()
    
    # Extract the rows with 'geneId', 'transcriptId', 'nbExons' from the DataFrame
    additional_info_rows = df.loc[df.index.intersection(['geneId', 'transcriptId', 'nbExons'])]
    # Drop 'geneId', 'transcriptId', 'nbExons' rows from the DataFrame if they exist
    df = df.drop(['geneId', 'transcriptId', 'nbExons'], errors='ignore')

    # Convert the DataFrame to a sparse matrix if the sparse flag is True
    if sparse:
        matrix = csr_matrix(df.values.astype('float32'))
    else:
        try:
            matrix = df.values.astype('float32')
        except ValueError:
            print("Error: Non-numeric data present in the DataFrame. Cannot convert to float.")
            return None
    
    # Convert the matrix to an AnnData object
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        # Ensure that the index and columns are converted to strings
        obs_index_as_str = df.index.astype(str)
        var_columns_as_str = df.columns.astype(str)
        anndata = sc.AnnData(X=matrix, obs=pd.DataFrame(index=obs_index_as_str), var=pd.DataFrame(index=var_columns_as_str))
    
    # Add additional information to the AnnData object vars
    for info in ['geneId', 'transcriptId', 'nbExons']:
        if info in additional_info_rows.index:
            anndata.var[info] = additional_info_rows.loc[info, :].astype(str).values
    
    # Ensure unique observation names
    anndata.obs_names_make_unique()
    
    return anndata



This section pertains to the retrieval of sample data which is essential for testing and validating the functionality of the Isomatrix Tools module. The sample data is an isomatrix text file, which is a typical output of the SiCeLoRe pipeline, and is used to ensure that the conversion to an AnnData object is performed correctly. The downloaded file is also used in the demonstration and testing of other functions within this module.


In [5]:
#| export
def download_test_data() -> str: #The absolute path of the extracted file 'sample_isomatrix.txt' if the download is successful.
    """
    This function downloads a test data file from a specified URL, saves it locally, and extracts it.
    """

    # URL of the file to be downloaded
    url = "https://ftp.ncbi.nlm.nih.gov/geo/samples/GSM3748nnn/GSM3748087/suppl/GSM3748087%5F190c.isoforms.matrix.txt.gz"

    # Download the file from `url` and save it locally under `file.txt.gz`:
    urllib.request.urlretrieve(url, 'file.txt.gz')

    # Check if the file is downloaded correctly
    if os.path.exists('file.txt.gz'):
        print("File downloaded successfully")
        # Now we need to extract the file
        with gzip.open('file.txt.gz', 'rb') as f_in:
            with open('sample_isomatrix.txt', 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print("File extracted successfully")
        return os.path.abspath('sample_isomatrix.txt')
    else:
        print("Failed to download the file")
        return None


The Isomatrix Tools module includes functionality to simulate an isomatrix, which is a matrix representation of transcript expression data. This simulated data can be used for testing and validation purposes within the Isomatrix Tools framework. The simulation is designed to mimic the structure and characteristics of real transcriptomic datasets, allowing users to generate data with specified parameters such as the number of genes, transcripts per gene, number of samples, sparsity of the matrix, and maximum expression levels. The simulate_isomatrix function in the code block below provides a practical example of how such data can be generated for use with Isomatrix Tools.


In [6]:
#| export

import numpy as np 
from pandas import DataFrame

def simulate_isomatrix(num_genes: int, # number of genes (groups of rows)
                       num_transcripts_per_gene: int, # number of transcripts per gene
                       num_samples: int, # number of samples (columns)
                       sparsity: float = 0.95, # fraction of zeros in the data (default 0.95)
                       max_expression: int = 100, # maximum expression level for any transcript in any sample
                       seed: int = 0 # random seed for reproducibility
                      ) -> DataFrame : # DataFrame with simulated transcript expression data for demonstration purposes.
    """
    Simulate transcript expression data to match the structure of the first image provided by the user.
    Allows specifying the number of genes, transcripts per gene, and samples.
    """
    # Set random seed for reproducibility
    np.random.seed(seed)
    
    # Calculate total number of transcripts
    total_transcripts = num_genes * num_transcripts_per_gene
    
    # Generate random data
    data = np.random.rand(total_transcripts, num_samples)
    
    # Apply sparsity
    zero_mask = np.random.rand(total_transcripts, num_samples) > sparsity
    data[~zero_mask] = 0  # Set a fraction of data to 0 based on sparsity
    
    # Scale data to have values up to max_expression
    data = np.ceil(data * max_expression).astype(int)
    
    # Generate transcript and sample labels
    transcript_ids = [f"ENSMUST00000{str(i).zfill(6)}_{str(j).zfill(6)}_{str(seed).zfill(6)}.1" for j in range(num_genes) for i in range(1, num_transcripts_per_gene + 1)]
    gene_ids = [f"Gene_{(i // num_transcripts_per_gene) + 1}" for i in range(total_transcripts)]
    nb_exons = np.random.randint(1, 21, total_transcripts)  # Assuming 1-20 exons based on typical gene structures
    sample_ids = [f"CACCTACACGTCAAC{str(i).zfill(2)}" for i in range(1, num_samples + 1)]
    
    # Create DataFrame
    df = pd.DataFrame(data, index=gene_ids, columns=sample_ids)
    df.index.name = 'geneId'  # Add index name
    df.insert(0, 'transcriptId', transcript_ids)
    df.insert(1, 'nbExons', nb_exons)
    
    return df


The isomatrix_tools module provides functionality to simulate and generate multiple isomatrix datasets. These datasets are essential for testing and demonstrating the capabilities of downstream analysis tools. The simulation process involves creating synthetic gene expression data that closely resembles real-world isomatrix data structures. This includes the ability to specify the number of genes, transcripts per gene, samples, and control the sparsity and maximum expression levels of the generated data. The simulate_and_save_isomatrices function within this module is particularly useful for creating a series of isomatrix files, which can be saved to a specified directory for further use in pipeline testing or demonstration purposes. The function also offers options to return the file paths of the generated isomatrices and to output progress messages during the simulation process.


In [7]:
#| export
def simulate_and_save_isomatrices(num_isomatrix: int, #number of isomatrix to generate
                                num_genes: int, # number of genes (groups of rows)
                                num_transcripts_per_gene: int, # number of transcripts per gene
                                num_samples: int, # number of samples (columns)
                                sparsity: float = 0.95, # fraction of zeros in the data (default 0.95)
                                max_expression: int = 100, # maximum expression level for any transcript in any sample
                                seed: int = 0, # random seed for reproducibility
                                output_dir: str = './', # directory to save the generated isomatrix txt files
                                return_paths: bool = False, # return paths to the isomatrixs as a list of strings if True
                                verbose: bool = False # print progress messages if True
                               ) -> list: # list of paths for the simulated matrices (if return is set True)
    
    """
    Simulate multiple isomatrix and save them as txt files in the specified directory.
    If return_paths is True, return a list of paths to the saved isomatrix files.
    """
    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)
    
    output_files = []
    for i in range(num_isomatrix):
        # Generate isomatrix
        df = simulate_isomatrix(num_genes, num_transcripts_per_gene, num_samples, sparsity, max_expression, seed+i)
        
        # Save to txt file
        output_file = os.path.join(output_dir, f'isomatrix_{i+1}.txt')
        df.to_csv(output_file, sep='\t')
        
        if verbose:
            print(f'Isomatrix {i+1} saved to {output_file}')
        output_files.append(output_file)
    
    if return_paths:
        return output_files

In [8]:
#| export
def convert_and_save_file(sample: str, # path to the isomatrix txt file
                          verbose: bool, # print progress messages if True
                          sparse: bool = False # store the anndata object in sparse format if True
                         ) -> str: # path to the converted h5ad file
    """
    Convert an isomatrix txt file to an AnnData h5ad file and save it to disk.
    If the file already exists, it is overwritten.
    A delay and retry mechanism is implemented to handle file locking issues.
    """
    anndata = isomatrix_to_anndata(sample, sparse=sparse)
    h5ad_file = sample.replace('.txt', '.h5ad')
    
    # Check if the file already exists and delete it if it does
    if os.path.exists(h5ad_file):
        os.remove(h5ad_file)
    
    # Add a delay and retry mechanism to handle file locking issues
    for attempt in range(10):
        try:
            anndata.write_h5ad(h5ad_file)
            break
        except BlockingIOError:
            if attempt < 9:  # i.e. if this is not the last attempt
                time.sleep(1)  # wait for 1 second before the next attempt
            else:
                raise  # re-raise the last exception if all attempts fail
    
    if verbose:
        print(f"File {h5ad_file} was successfully written to disk.")
    
    return h5ad_file


#he Isomatrix tools module provides a suite of functions designed to facilitate the conversion of isomatrix files into AnnData objects, which are suitable for downstream analysis in single-cell genomics workflows. This module includes a function `multiple_isomatrix_conversion` that efficiently processes batches of isomatrix files, leveraging parallel processing to expedite the conversion. The converted AnnData objects can optionally be stored in a sparse format to optimize memory usage. Additionally, the module contains functions for handling file I/O operations, such as checking for the existence of files and implementing a retry mechanism to address file locking issues during the write process. The module also includes functionality to standardize feature sets across multiple datasets, ensuring consistency in subsequent analyses.


In [9]:
#| export
def multiple_isomatrix_conversion(file_paths: list, # A list of file paths to be converted.
                                  verbose: bool = False, # If True, print progress messages.
                                  return_paths: bool = False, # If True, return a list of paths to the converted files.
                                  sparse: bool = False # If True, the anndata object will be stored in sparse format.
                                  ) -> list: # A list of paths to the converted files.
    """
    This function takes a list of file paths, converts each file from isomatrix to anndata format, 
    and saves the converted file in the same location with the same name but with a .h5ad extension.
    If return_paths is True, it returns a list of paths to the converted files.
    If sparse is True, the anndata object will be stored in sparse format.
    """
    with Pool() as p:
        converted_files = p.map(partial(convert_and_save_file, verbose=verbose, sparse=sparse), file_paths)
    
    if return_paths:
        return converted_files


from joblib import Parallel, delayed
from tqdm import tqdm



In [10]:
#| hide
#| export
def load_and_set_var_names(path):
    dataset = sc.read_h5ad(path, backed='r')  # Read the file in 'backed' mode to avoid loading the whole data into memory
    dataset.var_names = dataset.var['transcriptId']
    return dataset


The Isomatrix tools module includes advanced functionality for standardizing transcript features across multiple isoform matrix datasets. This process is crucial for ensuring that subsequent analyses are consistent and comparable. The module provides the option to standardize features by either taking the union or the intersection of all transcripts present in the datasets. The 'union' method combines all unique transcripts from each dataset, thus preserving the full range of features. In contrast, the 'intersection' method retains only those transcripts that are common to all datasets, which can be beneficial for comparative studies where only shared features are of interest. This flexibility allows researchers to tailor the standardization process to their specific analytical needs and the nature of their datasets.


In [11]:
#| export
def feature_set_standardization(adatas:list, # list of AnnData objects or paths to AnnData files
                                standardization_method:str = 'union' # str specifiying method to use union or intersection
                                ) -> list: # list of anndata objects with the features standardised 
    """
    Standardize the feature sets of multiple AnnData objects.
    
    This function takes a list of AnnData objects or paths to AnnData files and a standardization method as input.
    The standardization method can be either 'union' or 'intersection'.
    If 'union' is chosen, the function will create a union of all features across all AnnData objects.
    If 'intersection' is chosen, the function will create an intersection of all features across all AnnData objects.
    The function returns a list of standardized AnnData objects.
    """
    # Check if the first element in adatas is a string
    if isinstance(adatas[0], str):
        # If it is, load anndata objects from paths
        adatas = [load_and_set_var_names(path) for path in adatas]
        for adata in adatas:
            if isinstance(adata.X, csr_matrix):
                adata.X = adata.X.toarray()
    all_features = set()
    common_features = set()
    
    # Get union or intersection of all feature sets across all AnnData objects
    if standardization_method == 'union':
        all_features = set().union(*[set(adata.var.itertuples(index=False, name=None)) for adata in adatas])
    elif standardization_method == 'intersection':
        common_features = set.intersection(*[set(adata.var['transcriptId']) for adata in adatas])
    else:
        raise ValueError("standardization_method should be 'union' or 'intersection'")
    

    standardized_adatas = []
    # Iterate over each AnnData object
    for dataset in tqdm(adatas, desc= f"Standardizing anndata features via {standardization_method}"):
        dataset.var_names = dataset.var['transcriptId']
        existing_var = dataset.var
        # Identify features that are in the union/intersection but not in the current AnnData object
        missing_features = all_features - set(dataset.var.itertuples(index=False, name=None))
        if standardization_method == 'union':
            if missing_features:
                # Create a DataFrame of zeros with rows equal to the number of observations in the current AnnData object
                # and columns equal to the number of missing features
                zero_data = np.zeros((dataset.n_obs, len(missing_features)), dtype=np.float32)
                zero_df = pd.DataFrame(zero_data, index=dataset.obs_names, columns=pd.Index([t[1] for t in missing_features], name='transcriptId'))

                # Merge the zero_df with the dataset's .to_df() DataFrame along the columns
                combined_df = pd.concat([dataset.to_df(), zero_df], axis=1)

                # Convert the combined DataFrame back to an AnnData object
                combined_data = sc.AnnData(combined_df, obs=dataset.obs, var=pd.DataFrame(index=combined_df.columns))

                missing_df = pd.DataFrame(list(missing_features), columns=['geneId', 'transcriptId', 'nbExons'])
                missing_df.set_index('transcriptId', inplace=True)

                combined_data.var = pd.concat([existing_var, missing_df], axis=0)
                combined_data.var['transcriptId'] = combined_data.var_names
                standardized_adatas.append(combined_data)
            else:
                # If no features are missing, append the dataset as is
                standardized_adatas.append(dataset)
        elif standardization_method == 'intersection':
            # Subset the dataset to only include transcriptIds that are in the intersection
            dataset = dataset[:, dataset.var_names.isin(common_features)]
            standardized_adatas.append(dataset)
    return standardized_adatas

The Isomatrix tools module includes a function to validate and prepare an AnnData object for serialization and storage. This function ensures that the AnnData object conforms to the requirements for writing to disk as an .h5ad file, which is the file format used by Scanpy's `write_h5ad` method. The function checks for the presence of missing values in the `.var` and `.obs` DataFrames, converts non-string categorical data to strings, ensures that the observation and variable names are of string data type, and verifies that there are no duplicate names. Additionally, it checks for NaN values in the `.X` attribute, which holds the main data matrix, and provides warnings if any issues are detected that could interfere with the file writing process. This preprocessing step is crucial for maintaining data integrity and ensuring compatibility with downstream analysis tools that rely on the .h5ad file format.


In [12]:
#| export


def check_anndata_for_saving(adata: AnnData, # The AnnData object to check.
                               verbose: bool = False # If True, print status messages. Defaults to False.
                               ):
    """
    Prepare an AnnData object for saving by ensuring proper data types and handling missing values.
    """
    # Function to check and convert data types in a DataFrame
    def convert_df(df):
        """
        Check and convert the data types of a DataFrame, filling missing values.
        
        Parameters:
        df (DataFrame): The DataFrame to process.
        """
        for col in df.columns:
            if df[col].isnull().any():
                if verbose:
                    print(f"Missing values found in '{col}'. Filling with 'unknown' or median.")
                if pd.api.types.is_categorical_dtype(df[col]) or pd.api.types.is_object_dtype(df[col]):
                    df[col] = df[col].astype('category').cat.add_categories(['unknown']).fillna('unknown')
                else:
                    df[col] = df[col].fillna(df[col].median())
            if pd.api.types.is_object_dtype(df[col]):
                df[col] = df[col].astype(str)
            if pd.api.types.is_categorical_dtype(df[col]):
                df[col] = df[col].astype(str)
    
    # Process .var DataFrame if it exists
    if adata.var is not None:
        if verbose:
            print("Processing .var DataFrame")
        convert_df(adata.var)
    
    # Process .obs DataFrame if it exists
    if adata.obs is not None:
        if verbose:
            print("Processing .obs DataFrame")
        convert_df(adata.obs)
    
    # Ensure observation and variable names are strings
    adata.obs_names = adata.obs_names.astype(str)
    adata.var_names = adata.var_names.astype(str)

    # Check for and warn about duplicate names
    if adata.obs_names.duplicated().any() and verbose:
        print("Duplicate obs_names found, consider making them unique.")
    if adata.var_names.duplicated().any() and verbose:
        print("Duplicate var_names found, consider making them unique.")
    
    # Check if .X contains NaN values and handle if necessary, accounting for sparse matrices
    if issparse(adata.X):
        if np.isnan(adata.X.data).any() and verbose:
            print("NaN values found in sparse .X data. Consider handling them.")
            # Handle NaN values in sparse matrix data if necessary
    else:
        if np.isnan(adata.X).any() and verbose:
            print("NaN values found in .X, consider handling them.")
            # Handle NaN values in .X if necessary
    
    # Final message if verbose
    if verbose:
        print("Preparation complete.")


This module provides functionality to concatenate multiple AnnData objects, which are essential data structures for storing large-scale single-cell genomics data. The concatenation process is designed to combine data from different batches or experiments, while ensuring that the resulting AnnData object maintains the integrity and structure necessary for downstream analysis. The Isomatrix tools facilitate this process by handling discrepancies in data types, filling missing values, and standardizing feature sets across all input datasets. This ensures that the concatenated dataset is ready for high-throughput computational analyses, such as clustering, visualization, and differential expression testing, which are common in single-cell genomics workflows.


In [13]:
#| export
def make_unique_batch_keys(batch_keys):
    unique_keys = set()
    final_keys = []
    for key in batch_keys:
        original_key = key
        counter = 1
        while key in unique_keys:
            key = f"{original_key}_{counter}"
            counter += 1
        unique_keys.add(key)
        final_keys.append(key)
    return final_keys



In [14]:
#| export

def concatenate_anndata(h5ad_inputs: list, # A list of AnnData objects or paths to .h5ad files.
                         standardization_method='union', # The method to standardize the feature sets across all AnnData objects. It can be either 'union' or 'intersection'. Default is 'union'.
                         sparse=False, # Optional flag to convert the final matrix to sparse. Default is False.
                         verbose=False # Optional flag to print progress updates. Default is False.
                         ) -> ad.AnnData: # The concatenated AnnData object.
    """
    This function concatenates multiple AnnData objects into a single AnnData object and adds batch keys to identify the origin of each sample.
    """
    
    # Check if inputs are paths or actual anndata objects
    if isinstance(h5ad_inputs[0], str):
        if verbose: print("Reading .h5ad files...")
        to_concat = [ad.read_h5ad(input) for input in h5ad_inputs]
        batch_keys = [os.path.basename(os.path.dirname(input)) for input in h5ad_inputs]
    else:
        if verbose: print("Processing AnnData objects...")
        to_concat = h5ad_inputs
        batch_keys = [os.path.basename(os.path.dirname(input)) for input in h5ad_inputs]

    # Ensure batch keys are unique, append a UID if necessary
    batch_keys = make_unique_batch_keys(batch_keys)

    # If .X is sparse, convert to dense
    for adata in to_concat:
        if isinstance(adata.X, csr_matrix):
            adata.X = adata.X.toarray()

    # Apply feature set standardization
    if verbose: print("Applying feature set standardization...")
    to_concat = feature_set_standardization(to_concat, standardization_method)

    # Concatenate anndata objects with scanpy, specifying batch information
    if verbose: print("Concatenating AnnData objects and adding batch keys with scanpy...")
    concat_anndata = ad.concat(to_concat, join='outer', label='batch', keys=batch_keys)

    # Set the .var attribute of the concatenated AnnData object to be the same as the first input AnnData object
    if verbose: print("Setting .var attribute...")
    concat_anndata.var = to_concat[0].var
    concat_anndata.var = concat_anndata.var.astype(str)
    concat_anndata.obs = concat_anndata.obs.astype(str)

    # If the sparse flag is True, convert the final matrix to sparse
    if sparse:
        if verbose: print("Converting matrix to sparse...")
        try:
            concat_anndata.X = csr_matrix(concat_anndata.X.astype(np.float32))
        except Exception as e:
            print(f"Error converting to sparse matrix: {e}")

    # Convert the data to float32 to avoid TypeError when writing to .h5ad file
    concat_anndata.X = concat_anndata.X.astype(np.float32)
    
    if verbose: print("Final Check...")
    check_anndata_for_saving(concat_anndata)
    

    if verbose: print("Concatenation complete.")
    return concat_anndata

In [15]:
#| hide
import nbdev; nbdev.nbdev_export()