In [1]:
#!/usr/bin/env python
# Script to perform differential peak accessibility analysis and visualize genomic regions 
# using single-cell ATAC-Seq data in Python
# Equivalent of the R Signac package functionality

# Install packages
# pip install scanpy anndata muon episcanpy pyensembl pyBigWig matplotlib seaborn sklearn pandas
# pip install pybiomart pynndescent scvi-tools

import scanpy as sc
import muon as mu
import episcanpy.api as epi
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import TruncatedSVD
from sklearn.neighbors import NearestNeighbors
import pyBigWig
import pyensembl
import anndata as ad
from scipy import sparse
import scvi

# Load pre-processed ATAC data
# In Python, we would use AnnData/MuData objects rather than Seurat objects
pbmc_atac = mu.read_h5mu("data/pbmc_atac.h5mu")  # Assuming the data is stored in h5mu format
# Alternatively for .h5ad file: pbmc_atac = sc.read_h5ad("data/pbmc_atac.h5ad")

# Plot UMAP embedding
sc.pl.umap(pbmc_atac, color="leiden", legend_loc=None, title="scATAC-Seq")

# Create a gene activity matrix
# Equivalent to Signac's GeneActivity function
def calculate_gene_activity(atac_data, annotation_file=None, upstream=2000, downstream=0):
    """Calculate gene activity scores from ATAC-seq data"""
    # This is a simplified version - in practice you would:
    # 1. Get gene coordinates from a GTF file or similar
    # 2. Define promoter regions (TSS ± upstream/downstream)
    # 3. Count fragments overlapping each promoter/gene body
    
    # For demonstration, we'll create a dummy gene activity matrix
    if annotation_file:
        # Load gene annotations
        genes = pd.read_csv(annotation_file)
        # Map peaks to genes based on proximity
        # ...implementation depends on your annotation format
    
    # For this example, we create a matrix based on peaks near genes
    # In practice, you'd implement a proper peak-to-gene mapping algorithm
    counts = sparse.csr_matrix((atac_data.n_vars, atac_data.n_obs))
    var_names = [f"gene_{i}" for i in range(counts.shape[0])]
    
    # Create AnnData object with gene activities
    gene_activities = ad.AnnData(
        X=counts.T,
        obs=atac_data.obs.copy(),
        var=pd.DataFrame(index=var_names)
    )
    
    return gene_activities

# Calculate gene activity scores
gene_activities = calculate_gene_activity(pbmc_atac)

# In a real implementation, you would use genomic annotations:
# ensembl = pyensembl.EnsemblRelease(75)  # Equivalent to EnsDb.Hsapiens.v75
# gene_activities = calculate_gene_activity(pbmc_atac, ensembl)

# Add gene activity as a new layer/modality
pbmc_atac.mod["RNA"] = gene_activities

# Normalize gene activity scores
sc.pp.normalize_total(pbmc_atac.mod["RNA"], target_sum=1e4)
sc.pp.log1p(pbmc_atac.mod["RNA"])

# Visualize activity of canonical marker genes
marker_genes = ['MS4A1', 'CD3D', 'LEF1', 'NKG7', 'TREM1', 'LYZ']
sc.pl.umap(pbmc_atac.mod["RNA"], color=marker_genes, ncols=3, vmax='p99')

# Integrating with scRNA-Seq data
# Load pre-processed RNA-seq data
pbmc_rna = sc.read_h5ad('data/pbmc_10k_v3.h5ad')  # Assuming converted to h5ad format

# Plot before integration
fig, axs = plt.subplots(1, 2, figsize=(12, 5))
sc.pl.umap(pbmc_atac, color="leiden", title="scATAC-Seq", show=False, ax=axs[0], legend_loc=None)
sc.pl.umap(pbmc_rna, color="celltype", title="scRNA-Seq", show=False, ax=axs[1], legend_loc=None)
plt.tight_layout()
plt.show()

# Transfer labels from RNA-seq to ATAC-seq
# Using scvi-tools for label transfer (alternative to Seurat's TransferAnchors)
# First prepare the data
scvi.data.setup_anndata(pbmc_rna, labels_key="celltype")
scvi.data.setup_anndata(pbmc_atac.mod["RNA"])

# Train a model on RNA reference data
vae = scvi.model.SCVI(pbmc_rna)
vae.train()

# Get reference latent representation
reference_latent = vae.get_latent_representation()

# Train a model on ATAC gene activity data
vae_atac = scvi.model.SCVI(pbmc_atac.mod["RNA"])
vae_atac.train()

# Get query latent representation
query_latent = vae_atac.get_latent_representation()

# Find nearest neighbors between datasets
nn = NearestNeighbors(n_neighbors=10)
nn.fit(reference_latent)
distances, indices = nn.kneighbors(query_latent)

# Transfer labels using weighted voting
cell_types = pbmc_rna.obs["celltype"].values
weights = 1 / (distances + 1e-6)  # Avoid division by zero

predicted_labels = []
prediction_scores = []

for i in range(len(indices)):
    neighbor_labels = cell_types[indices[i]]
    neighbor_weights = weights[i]
    
    # Count weighted votes for each label
    label_votes = {}
    for label, weight in zip(neighbor_labels, neighbor_weights):
        label_votes[label] = label_votes.get(label, 0) + weight
    
    # Find the label with most votes
    best_label = max(label_votes, key=label_votes.get)
    max_score = label_votes[best_label] / sum(label_votes.values())
    
    predicted_labels.append(best_label)
    prediction_scores.append(max_score)

# Add predictions to ATAC-seq data
pbmc_atac.obs["predicted_id"] = predicted_labels
pbmc_atac.obs["prediction_score"] = prediction_scores

# Visualize the results
fig, axs = plt.subplots(1, 2, figsize=(12, 5))
sc.pl.umap(pbmc_atac, color="predicted_id", title="scATAC-Seq with transferred labels", 
           show=False, ax=axs[0], legend_loc=None)
sc.pl.umap(pbmc_rna, color="celltype", title="scRNA-Seq", 
           show=False, ax=axs[1], legend_loc=None)
plt.tight_layout()
plt.show()

# Finding differentially accessible peaks between cell types
# Set the active modality back to ATAC
# Use Scanpy's rank_genes_groups for differential testing

# Get cells from each group
cd4_naive_cells = pbmc_atac.obs[pbmc_atac.obs["predicted_id"] == "CD4 Naive"].index
monocyte_cells = pbmc_atac.obs[pbmc_atac.obs["predicted_id"] == "CD14+ Monocytes"].index

# Create a new object with just these two groups for comparison
compare_groups = pbmc_atac[list(cd4_naive_cells) + list(monocyte_cells)].copy()
compare_groups.obs["comparison_group"] = ["CD4 Naive" if idx in cd4_naive_cells else "CD14+ Monocytes" 
                                        for idx in compare_groups.obs.index]

# Perform differential testing
sc.tl.rank_genes_groups(compare_groups, groupby="comparison_group", reference="CD14+ Monocytes", 
                        method="wilcoxon", pts=True, key_added="da_peaks")

# Get results dataframe
da_results = sc.get.rank_genes_groups_df(compare_groups, group="CD4 Naive", key="da_peaks")
da_results = da_results.sort_values("logfoldchanges", ascending=False)
print(da_results.head())

# Plot top differential peak
top_peak = da_results.iloc[0]["names"]

# Violin plot
sc.pl.violin(compare_groups, top_peak, groupby="comparison_group", 
             groups=["CD4 Naive", "CD14+ Monocytes"])

# Feature plot (on UMAP)
sc.pl.umap(pbmc_atac, color=top_peak)

# Calculate fold changes between groups
def calculate_fold_change(adata, group_key, group1, group2):
    """Calculate log fold change between two groups for all features"""
    adata_subset = adata[adata.obs[group_key].isin([group1, group2])].copy()
    
    # Mean expression in each group
    mean_expr1 = adata_subset[adata_subset.obs[group_key] == group1].X.mean(axis=0)
    mean_expr2 = adata_subset[adata_subset.obs[group_key] == group2].X.mean(axis=0)
    
    # Avoid division by zero
    mean_expr2 = np.maximum(mean_expr2, 1e-6)
    
    # Log fold change
    log_fc = np.log2(mean_expr1 / mean_expr2)
    
    # Create dataframe
    if sparse.issparse(adata.X):
        log_fc = log_fc.A1
    else:
        log_fc = log_fc.flatten()
        
    result = pd.DataFrame({
        'feature': adata.var_names,
        'avg_log2FC': log_fc,
        'mean_expr1': mean_expr1.flatten() if not sparse.issparse(adata.X) else mean_expr1.A1,
        'mean_expr2': mean_expr2.flatten() if not sparse.issparse(adata.X) else mean_expr2.A1
    })
    
    return result.sort_values('avg_log2FC', ascending=False)

# Calculate fold changes
fc = calculate_fold_change(pbmc_atac, "predicted_id", "CD4 Naive", "CD14+ Monocytes")
print(fc.head())

# Plot genomic regions - visualization of coverage
# For this functionality, we would need to use a genomic visualization package
# Here's a simplified example using custom plotting functions:

def plot_coverage(atac_data, peak_region, upstream=40000, downstream=20000, 
                 groupby="predicted_id", height=7, figsize=(12, 8)):
    """
    Plot coverage of a genomic region across different cell groups.
    This is a simplified version of the actual implementation.
    """
    # In practice, you would:
    # 1. Get the coordinates of the peak_region
    # 2. Extend by upstream/downstream amount
    # 3. Extract fragment coverage in this window for each cell group
    # 4. Plot as a stacked track plot
    
    # For this demo, we'll create a simplified plot
    groups = atac_data.obs[groupby].unique()
    
    fig, axes = plt.subplots(len(groups), 1, figsize=figsize, sharex=True, 
                           sharey=True, gridspec_kw={'hspace': 0})
    
    # In a real implementation, we'd compute actual coverage
    for i, group in enumerate(groups):
        ax = axes[i] if len(groups) > 1 else axes
        
        # Simulated coverage data - in reality, you'd calculate this from fragment files
        positions = np.linspace(0, 100, 1000)
        signal = np.random.poisson(10 * np.exp(-0.5 * ((positions - 50) / 10)**2))
        
        ax.fill_between(positions, signal, alpha=0.5)
        ax.set_ylabel(group)
        ax.set_xlim(0, 100)
        
        # Remove x ticks except for bottom plot
        if i < len(groups) - 1:
            ax.set_xticks([])
    
    # Set the title and axis labels
    fig.suptitle(f"Coverage plot for {peak_region}")
    axes[-1].set_xlabel("Genomic position")
    
    plt.tight_layout()
    plt.show()

# Plot coverage for the top differential peak
plot_coverage(pbmc_atac, top_peak)

# In a real-world scenario, you would implement an interactive browser 
# similar to CoverageBrowser using something like bokeh or plotly
def coverage_browser(atac_data, region="CD8A"):
    """
    Interactive coverage browser (simplified placeholder).
    In practice, you would use an interactive plotting library like plotly or bokeh.
    """
    print(f"Interactive browser would show coverage for region: {region}")
    # Implementation would depend on which interactive visualization library you use
    # This would typically generate an HTML file with interactive features

# Example call
coverage_browser(pbmc_atac, region="CD8A")

ModuleNotFoundError: No module named 'muon'