In [None]:
import os
import gzip
import shutil
import requests
from tqdm import tqdm
import numpy as np
import pandas as pd
import scipy.sparse as sp
import scipy.io
import anndata as ad
from typing import Dict, List, Optional, Tuple, Union


class GSE280853Harmonizer:
    """Class to harmonize the GSE280853 dataset."""
    
    # GEO accession number
    GEO_ACCESSION = "GSE280853"
    
    # URLs for the dataset files
    BASE_URL = "https://ftp.ncbi.nlm.nih.gov/geo/series/GSE280nnn/GSE280853/suppl/"
    FILES = [
        "GSE280853_sampleAB_feature_reference.csv.gz",
        "GSE280853_sampleA_barcodes.tsv.gz",
        "GSE280853_sampleA_features.tsv.gz",
        "GSE280853_sampleA_matrix.mtx.gz",
        "GSE280853_sampleB_barcodes.tsv.gz",
        "GSE280853_sampleB_features.tsv.gz",
        "GSE280853_sampleB_matrix.mtx.gz"
    ]
    
    # Sample information
    SAMPLE_INFO = {
        "A": {
            "title": "P14, ICD-humanized mouse PD1",
            "condition": "ICD-humanized mouse PD1",
            "perturbation_name": "ICD-humanized mouse PD1"
        },
        "B": {
            "title": "P14, mouse PD1",
            "condition": "wild-type mouse PD1",
            "perturbation_name": "wild-type mouse PD1"
        }
    }
    
    def __init__(self, data_dir: str):
        """
        Initialize the harmonizer.
        
        Args:
            data_dir: Path to the directory where the data is or will be stored.
        """
        self.data_dir = os.path.abspath(data_dir)
        os.makedirs(self.data_dir, exist_ok=True)
        
    def download_files(self) -> None:
        """Download the dataset files if they don't exist."""
        for file_name in self.FILES:
            file_path = os.path.join(self.data_dir, file_name)
            
            # Skip if file already exists
            if os.path.exists(file_path):
                print(f"File {file_name} already exists. Skipping download.")
                continue
            
            # Download the file
            url = f"{self.BASE_URL}{file_name}"
            print(f"Downloading {url}...")
            
            response = requests.get(url, stream=True)
            response.raise_for_status()
            
            total_size = int(response.headers.get('content-length', 0))
            block_size = 1024  # 1 Kibibyte
            
            with open(file_path, 'wb') as f, tqdm(
                desc=file_name,
                total=total_size,
                unit='iB',
                unit_scale=True,
                unit_divisor=1024,
            ) as bar:
                for data in response.iter_content(block_size):
                    size = f.write(data)
                    bar.update(size)
    
    def extract_files(self) -> None:
        """Extract the gzipped files."""
        for file_name in self.FILES:
            gz_file_path = os.path.join(self.data_dir, file_name)
            out_file_path = os.path.join(self.data_dir, file_name[:-3])  # Remove .gz extension
            
            # Skip if the extracted file already exists
            if os.path.exists(out_file_path):
                print(f"File {out_file_path} already exists. Skipping extraction.")
                continue
            
            print(f"Extracting {gz_file_path}...")
            with gzip.open(gz_file_path, 'rb') as f_in:
                with open(out_file_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
    
    def read_10x_mtx(self, sample: str) -> Tuple[sp.csr_matrix, pd.DataFrame, pd.DataFrame]:
        """
        Read a 10x dataset in MTX format.
        
        Args:
            sample: Sample identifier ('A' or 'B').
            
        Returns:
            Tuple containing:
                - Sparse matrix of expression values
                - DataFrame of features
                - DataFrame of barcodes
        """
        # File paths
        matrix_file = os.path.join(self.data_dir, f"{self.GEO_ACCESSION}_sample{sample}_matrix.mtx")
        features_file = os.path.join(self.data_dir, f"{self.GEO_ACCESSION}_sample{sample}_features.tsv")
        barcodes_file = os.path.join(self.data_dir, f"{self.GEO_ACCESSION}_sample{sample}_barcodes.tsv")
        
        # Read the matrix
        matrix = scipy.io.mmread(matrix_file).T.tocsr()
        
        # Read features and barcodes
        features = pd.read_csv(features_file, sep='\t', header=None)
        features.columns = ['gene_id', 'gene_name', 'feature_type']
        
        barcodes = pd.read_csv(barcodes_file, sep='\t', header=None)
        barcodes.columns = ['barcode']
        
        return matrix, features, barcodes
    
    def create_anndata(self, sample: str) -> Tuple[ad.AnnData, ad.AnnData]:
        """
        Create AnnData objects for gene expression and protein data.
        
        Args:
            sample: Sample identifier ('A' or 'B').
            
        Returns:
            Tuple containing:
                - AnnData object for gene expression
                - AnnData object for protein data
        """
        # Read the data
        matrix, features, barcodes = self.read_10x_mtx(sample)
        
        # Split gene expression and protein data
        gene_indices = features[features['feature_type'] == 'Gene Expression'].index
        protein_indices = features[features['feature_type'] == 'Antibody Capture'].index
        
        gene_matrix = matrix[:, gene_indices]
        protein_matrix = matrix[:, protein_indices]
        
        gene_features = features.iloc[gene_indices]
        protein_features = features.iloc[protein_indices]
        
        # Create unique gene names to avoid duplicates
        gene_names = gene_features['gene_name'].values
        gene_ids = gene_features['gene_id'].values
        
        # Create a dictionary to track duplicates
        gene_name_count = {}
        unique_gene_names = []
        
        for i, name in enumerate(gene_names):
            if name in gene_name_count:
                gene_name_count[name] += 1
                unique_name = f"{name}_{gene_ids[i]}"
                unique_gene_names.append(unique_name)
            else:
                gene_name_count[name] = 1
                unique_gene_names.append(name)
        
        # Create AnnData objects
        gene_adata = ad.AnnData(
            X=gene_matrix,
            obs=pd.DataFrame(index=barcodes['barcode']),
            var=pd.DataFrame(index=unique_gene_names)
        )
        
        # Do the same for protein features
        protein_names = protein_features['gene_name'].values
        protein_ids = protein_features['gene_id'].values
        
        protein_name_count = {}
        unique_protein_names = []
        
        for i, name in enumerate(protein_names):
            if name in protein_name_count:
                protein_name_count[name] += 1
                unique_name = f"{name}_{protein_ids[i]}"
                unique_protein_names.append(unique_name)
            else:
                protein_name_count[name] = 1
                unique_protein_names.append(name)
        
        protein_adata = ad.AnnData(
            X=protein_matrix,
            obs=pd.DataFrame(index=barcodes['barcode']),
            var=pd.DataFrame(index=unique_protein_names)
        )
        
        # Add gene_id as a var attribute
        gene_adata.var['gene_id'] = gene_features['gene_id'].values
        protein_adata.var['protein_id'] = protein_features['gene_id'].values
        
        # Add sample information
        gene_adata.obs['sample'] = sample
        protein_adata.obs['sample'] = sample
        
        # Add standardized metadata
        for adata in [gene_adata, protein_adata]:
            adata.obs['organism'] = 'Mus musculus'
            adata.obs['cell_type'] = 'P14 CD8 T cell'
            adata.obs['crispr_type'] = 'None'
            adata.obs['cancer_type'] = 'Melanoma'
            adata.obs['condition'] = self.SAMPLE_INFO[sample]['condition']
            adata.obs['perturbation_name'] = self.SAMPLE_INFO[sample]['perturbation_name']
        
        return gene_adata, protein_adata
    
    def harmonize(self) -> Tuple[ad.AnnData, ad.AnnData]:
        """
        Harmonize the dataset.
        
        Returns:
            Tuple containing:
                - AnnData object for gene expression
                - AnnData object for protein data
        """
        # Download and extract files if needed
        self.download_files()
        self.extract_files()
        
        # Process each sample
        gene_adatas = []
        protein_adatas = []
        
        for sample in ['A', 'B']:
            gene_adata, protein_adata = self.create_anndata(sample)
            gene_adatas.append(gene_adata)
            protein_adatas.append(protein_adata)
        
        # Combine the samples
        combined_gene_adata = ad.concat(gene_adatas, join='outer')
        combined_protein_adata = ad.concat(protein_adatas, join='outer')
        
        # Store original gene names in a column for reference
        combined_gene_adata.var['original_gene_name'] = combined_gene_adata.var_names
        combined_protein_adata.var['original_protein_name'] = combined_protein_adata.var_names
        
        # Ensure all required metadata fields are present
        required_fields = ['organism', 'cell_type', 'crispr_type', 'cancer_type', 'condition', 'perturbation_name']
        for field in required_fields:
            assert field in combined_gene_adata.obs.columns, f"Missing required field: {field}"
            assert field in combined_protein_adata.obs.columns, f"Missing required field: {field}"
        
        return combined_gene_adata, combined_protein_adata
    
    def save_harmonized_data(self, output_dir: Optional[str] = None) -> None:
        """
        Save the harmonized data to h5ad files.
        
        Args:
            output_dir: Directory to save the output files. If None, uses the data_dir.
        """
        if output_dir is None:
            output_dir = self.data_dir
        
        os.makedirs(output_dir, exist_ok=True)
        
        # Harmonize the data
        gene_adata, protein_adata = self.harmonize()
        
        # Save the data
        gene_output_path = os.path.join(output_dir, f"{self.GEO_ACCESSION}_gene_expression.h5ad")
        protein_output_path = os.path.join(output_dir, f"{self.GEO_ACCESSION}_protein_expression.h5ad")
        
        print(f"Saving gene expression data to {gene_output_path}...")
        gene_adata.write(gene_output_path, compression='gzip')
        
        print(f"Saving protein expression data to {protein_output_path}...")
        protein_adata.write(protein_output_path, compression='gzip')
        
        print("Harmonization complete!")


def run_harmonization(data_dir: str = '/content/GSE280853', output_dir: Optional[str] = None) -> None:
    """
    Helper function to run the harmonization process in a Jupyter Notebook.
    
    Args:
        data_dir: Directory where the data is or will be stored.
        output_dir: Directory to save the output files. If None, uses the data_dir.
    """
    harmonizer = GSE280853Harmonizer(data_dir)
    harmonizer.save_harmonized_data(output_dir)


# Run the harmonization process with default directories.
# You can modify 'data_dir' and 'output_dir' as needed.
run_harmonization()
