In [None]:
pipeline = run_tcr_pipeline()

TCR Analysis Pipeline
TCR Analysis Pipeline Configuration


VBox(children=(HTML(value="<h3 style='margin: 5px 0;'>Dataset & Samples</h3>"), Dropdown(description='Dataset:â€¦

In [None]:
import os
import subprocess
import pandas as pd
from pathlib import Path
import gzip
import shutil
import json
import time
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual
from tqdm import tqdm
import requests

class ColabTCRPipeline:
    def __init__(self, output_dir="tcr_analysis", threads=2):
        self.output_dir = output_dir
        self.threads = threads
        self.reference_genome = "hg38_bcrtcr.fa"
        Path(self.output_dir).mkdir(parents=True, exist_ok=True)

        self.datasets = {
            'SRA_Immunology': {
                'name': 'SRA Immunology Dataset',
                'samples': [
                    {'id': 'SRR1918236', 'name': 'Patient_1', 'paired': True},
                    {'id': 'SRR1918241', 'name': 'Patient_2', 'paired': True},
                    {'id': 'SRR1918242', 'name': 'Patient_3', 'paired': True},
                    {'id': 'SRR1918243', 'name': 'Control_1', 'paired': True}
                ]
            }
        }

    def display_config_interface(self):
        """Display interactive configuration interface with fixed spacing"""
        print("TCR Analysis Pipeline Configuration")
        print("=" * 50)

        # Dataset selection
        dataset_dropdown = widgets.Dropdown(
            options=['SRA_Immunology', 'Custom'],
            value='SRA_Immunology',
            description='Dataset:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='400px')
        )

        # Sample selection for predefined datasets
        sample_options = [(f"{s['name']} ({s['id']})", s['id'])
                         for s in self.datasets['SRA_Immunology']['samples']]
        sample_selector = widgets.SelectMultiple(
            options=sample_options,
            description='Samples:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='400px', height='120px')
        )

        # Custom sample inputs
        custom_sample_id = widgets.Text(
            description='Sample ID:',
            placeholder='e.g., SRR1234567',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='400px')
        )

        custom_sample_name = widgets.Text(
            description='Sample Name:',
            placeholder='e.g., Custom_Sample_1',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='400px')
        )

        custom_paired = widgets.Checkbox(
            value=True,
            description='Paired-end reads',
            style={'description_width': '120px'}
        )

        add_custom_button = widgets.Button(
            description='Add Custom Sample',
            button_style='info',
            layout=widgets.Layout(width='150px')
        )

        custom_samples_list = widgets.Textarea(
            description='Added Samples:',
            value='',
            disabled=True,
            style={'description_width': '120px'},
            layout=widgets.Layout(height='80px', width='400px')
        )

        # Custom samples storage
        custom_samples = []

        # Segmentation settings
        num_segments = widgets.IntText(
            value=1,
            description='Segments:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='200px')
        )

        selected_segment = widgets.IntSlider(
            value=0, min=0, max=0, step=1,
            description='Use Segment:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='300px')
        )

        # Analysis parameters
        threads_slider = widgets.IntSlider(
            value=2, min=1, max=8, step=1,
            description='Threads:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='300px')
        )

        output_prefix = widgets.Text(
            value='tcr_analysis',
            description='Output Dir:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='300px')
        )

        reference_dropdown = widgets.Dropdown(
            options=['hg38', 'hg19', 'mm10'],
            value='hg38',
            description='Reference:',
            style={'description_width': '120px'},
            layout=widgets.Layout(width='250px')
        )

        # Run button
        run_button = widgets.Button(
            description='Run Analysis',
            button_style='success',
            layout=widgets.Layout(width='200px', height='35px')
        )

        # Status output
        status_output = widgets.Output()

        # Event handlers
        def update_sample_display(*args):
            if dataset_dropdown.value == 'Custom':
                sample_selector.layout.display = 'none'
                custom_container.layout.display = 'block'
            else:
                sample_selector.layout.display = 'block'
                custom_container.layout.display = 'none'

        def add_custom_sample_clicked(b):
            if custom_sample_id.value and custom_sample_name.value:
                sample_info = {
                    'id': custom_sample_id.value,
                    'name': custom_sample_name.value,
                    'paired': custom_paired.value
                }
                custom_samples.append(sample_info)

                sample_text = f"{sample_info['name']} ({sample_info['id']}) - {'Paired' if sample_info['paired'] else 'Single'}\n"
                custom_samples_list.value += sample_text

                # Clear inputs
                custom_sample_id.value = ''
                custom_sample_name.value = ''
                custom_paired.value = True
            else:
                with status_output:
                    print("Please fill in both Sample ID and Sample Name")

        def update_segment_range(*args):
            if num_segments.value > 0:
                selected_segment.max = max(0, num_segments.value - 1)
                if selected_segment.value >= num_segments.value:
                    selected_segment.value = max(0, num_segments.value - 1)
            else:
                num_segments.value = 1

        def on_run_clicked(b):
            with status_output:
                clear_output()
                print("Starting analysis...")

                # Get selected samples
                if dataset_dropdown.value == 'Custom':
                    selected_samples = custom_samples.copy()
                else:
                    selected_sample_ids = list(sample_selector.value)
                    selected_samples = [
                        s for s in self.datasets['SRA_Immunology']['samples']
                        if s['id'] in selected_sample_ids
                    ]

                if not selected_samples:
                    print("Please select at least one sample")
                    return

                if num_segments.value < 1:
                    print("Number of segments must be at least 1")
                    return

                # Update configuration
                self.threads = threads_slider.value
                self.output_dir = output_prefix.value
                self.reference_genome = f"{reference_dropdown.value}_bcrtcr.fa"

                Path(self.output_dir).mkdir(parents=True, exist_ok=True)

                # Run pipeline
                try:
                    results, summary = self.run_pipeline(
                        selected_samples,
                        num_segments.value,
                        selected_segment.value
                    )
                    print("Analysis completed successfully!")
                    if summary is not None and not summary.empty:
                        display(summary)
                    else:
                        print("No summary data available")

                except Exception as e:
                    print(f"Analysis failed: {str(e)}")

        # Bind events
        dataset_dropdown.observe(update_sample_display, 'value')
        add_custom_button.on_click(add_custom_sample_clicked)
        num_segments.observe(update_segment_range, 'value')
        run_button.on_click(on_run_clicked)

        # Create containers with minimal spacing
        custom_container = widgets.VBox([
            widgets.HTML("<b>Add Custom Samples</b>"),
            custom_sample_id,
            custom_sample_name,
            custom_paired,
            add_custom_button,
            custom_samples_list
        ], layout=widgets.Layout(display='none', margin='5px 0'))

        # Main interface with compact layout
        config_box = widgets.VBox([
            widgets.HTML("<h3 style='margin: 5px 0;'>Dataset & Samples</h3>"),
            dataset_dropdown,
            sample_selector,
            custom_container,
            widgets.HTML("<h3 style='margin: 10px 0 5px 0;'>Data Segmentation</h3>"),
            widgets.HBox([num_segments, selected_segment]),
            widgets.HTML("<h3 style='margin: 10px 0 5px 0;'>Analysis Parameters</h3>"),
            widgets.HBox([threads_slider]),
            widgets.HBox([output_prefix, reference_dropdown]),
            widgets.HTML("<div style='margin: 10px 0;'></div>"),
            run_button,
            status_output
        ], layout=widgets.Layout(margin='0', padding='10px'))

        # Initialize display
        update_sample_display()
        display(config_box)

    def setup_environment(self):
        """Setup TRUST4 environment - simplified for demo"""
        print("ðŸ”§ Setting up analysis environment...")

        # Create minimal reference file for demo
        if not os.path.exists(self.reference_genome):
            with open(self.reference_genome, 'w') as f:
                f.write(">TCR_reference\nACGTACGTACGTACGTACGT\n")

        print("Environment setup complete!")

    def download_sample_placeholder(self, sample_id, sample_name, is_paired=True):
        """Download sample data - placeholder implementation for demo"""
        print(f"ðŸ“¥ Setting up placeholder for {sample_name} ({sample_id})")

        try:
            if is_paired:
                # Create minimal placeholder FASTQ files
                for read_type in ['R1', 'R2']:
                    placeholder_file = f"{sample_name}_{read_type}.fastq.gz"
                    with gzip.open(placeholder_file, 'wt') as f:
                        # Create more realistic placeholder data
                        for i in range(1000):  # 1000 reads
                            f.write(f"@read_{i}\n")
                            f.write("ACGTACGTACGTACGTACGTACGTACGTACGT\n")
                            f.write("+\n")
                            f.write("IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII\n")
                return True
            else:
                placeholder_file = f"{sample_name}.fastq.gz"
                with gzip.open(placeholder_file, 'wt') as f:
                    for i in range(1000):
                        f.write(f"@read_{i}\n")
                        f.write("ACGTACGTACGTACGTACGTACGTACGTACGT\n")
                        f.write("+\n")
                        f.write("IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII\n")
                return True
        except Exception as e:
            print(f"Error creating placeholder files: {e}")
            return False

    def run_trust4(self, sample_name, r1_file, r2_file):
        """Run TRUST4 analysis - demo implementation"""
        print(f"ðŸ”¬ Running analysis on {sample_name}...")

        try:
            output_prefix = f"{self.output_dir}/{sample_name}_trust4"

            # Simulate processing time
            time.sleep(1)

            # Create demo AIRR output
            airr_data = {
                'sequence_id': [f'read_{i}' for i in range(1, 21)],
                'junction_aa': [
                    'CASSLGNTGELFF', 'CASSQDRDTQYF', 'CASRPGQGAYEQYF',
                    'CASSLAGTEAFF', 'CASSLWGDTQYF', 'CASRPRDGELF',
                    'CASSLAGNTEAFF', 'CASSQDRNTQYF', 'CASSLGQGAYEQYF',
                    'CASSLAGGEAFF', 'CASSLWGNTQYF', 'CASRPGDTQYF',
                    'CASSLAGNGELFF', 'CASSQDRTQYF', 'CASRPGQAYEQYF',
                    'CASSLAGTAFF', 'CASSLWGDGELFF', 'CASRPRDGELFF',
                    'CASSLAGQTEAFF', 'CASSQDRNGELFF'
                ],
                'productive': [True] * 18 + [False] * 2,
                'v_call': [f'TRBV{(i%20)+1}' for i in range(20)],
                'j_call': [f'TRBJ{(i%12)+1}' for i in range(20)],
                'read_count': [100, 85, 72, 64, 58, 45, 42, 38, 35, 32,
                              28, 25, 22, 20, 18, 15, 12, 10, 8, 5]
            }

            airr_df = pd.DataFrame(airr_data)
            airr_df.to_csv(f"{output_prefix}_airr.tsv", sep='\t', index=False)

            # Create demo CDR3 output
            with open(f"{output_prefix}_cdr3.out", 'w') as f:
                for i, (cdr3, count) in enumerate(zip(airr_data['junction_aa'], airr_data['read_count'])):
                    f.write(f"{cdr3}\t{count}\n")

            # Create demo report
            with open(f"{output_prefix}_report.tsv", 'w') as f:
                f.write("Sample\tTotal_reads\tTCR_reads\tProductive_TCR\n")
                f.write(f"{sample_name}\t1000\t{len(airr_data['sequence_id'])}\t18\n")

            return True, "Analysis completed successfully"

        except Exception as e:
            return False, f"Analysis failed: {str(e)}"

    def parse_trust4_results(self, sample_name):
        """Parse TRUST4 output files"""
        results = {}
        output_prefix = f"{self.output_dir}/{sample_name}_trust4"

        try:
            # Parse AIRR file
            airr_file = f"{output_prefix}_airr.tsv"
            if os.path.exists(airr_file):
                df = pd.read_csv(airr_file, sep='\t')
                results['airr'] = {
                    'total_sequences': len(df),
                    'unique_cdr3': df['junction_aa'].nunique() if 'junction_aa' in df.columns else 0,
                    'productive_sequences': len(df[df['productive'] == True]) if 'productive' in df.columns else 0,
                    'data': df
                }

            # Parse CDR3 file
            cdr3_file = f"{output_prefix}_cdr3.out"
            if os.path.exists(cdr3_file):
                with open(cdr3_file, 'r') as f:
                    results['cdr3'] = f.read()

            # Parse report file
            report_file = f"{output_prefix}_report.tsv"
            if os.path.exists(report_file):
                with open(report_file, 'r') as f:
                    results['report'] = f.read()

        except Exception as e:
            results['error'] = f"Error parsing results: {str(e)}"

        return results

    def generate_summary_report(self, all_results):
        """Generate summary report"""
        summary_data = []

        for sample_name, results in all_results.items():
            if 'airr' in results and isinstance(results['airr'], dict):
                summary_data.append({
                    'Sample': sample_name,
                    'Total_Sequences': results['airr']['total_sequences'],
                    'Unique_CDR3': results['airr']['unique_cdr3'],
                    'Productive_Sequences': results['airr']['productive_sequences'],
                    'Success_Rate': f"{(results['airr']['productive_sequences']/results['airr']['total_sequences']*100):.1f}%" if results['airr']['total_sequences'] > 0 else "0%"
                })
            else:
                summary_data.append({
                    'Sample': sample_name,
                    'Total_Sequences': 0,
                    'Unique_CDR3': 0,
                    'Productive_Sequences': 0,
                    'Success_Rate': 'Failed'
                })

        if summary_data:
            summary_df = pd.DataFrame(summary_data)
            summary_df.to_csv(f"{self.output_dir}/analysis_summary.csv", index=False)
            return summary_df
        else:
            return pd.DataFrame()

    def run_pipeline(self, samples, num_segments=1, selected_segment=0):
        """Run the complete pipeline"""
        print(f" Starting TCR Analysis Pipeline")
        print(f"   Samples: {len(samples)}")
        print(f"   Segments: {num_segments} (using segment {selected_segment + 1})")


        all_results = {}

        try:
            self.setup_environment()

            for i, sample in enumerate(samples, 1):
                sample_id = sample['id']
                sample_name = sample['name']
                is_paired = sample.get('paired', True)

                print(f"[{i}/{len(samples)}] Processing: {sample_name}")

                # Download/setup sample
                success = self.download_sample_placeholder(sample_id, sample_name, is_paired)
                if not success:
                    print(f"Skipping {sample_name} - setup failed")
                    all_results[sample_name] = {'error': 'Sample setup failed'}
                    continue

                # Run analysis
                r1_file = f"{sample_name}_R1.fastq.gz"
                r2_file = f"{sample_name}_R2.fastq.gz" if is_paired else None

                success, log = self.run_trust4(sample_name, r1_file, r2_file)
                if success:
                    results = self.parse_trust4_results(sample_name)
                    all_results[sample_name] = results
                    print(f"Completed {sample_name}")
                else:
                    print(f"Failed {sample_name}: {log}")
                    all_results[sample_name] = {'error': log}

                print()  # Single line break between samples

            # Generate summary
            summary = self.generate_summary_report(all_results)


            print("ANALYSIS SUMMARY")


            return all_results, summary

        except Exception as e:
            print(f"Pipeline failed: {str(e)}")
            return {}, pd.DataFrame()

# Main function to initialize the pipeline
def run_tcr_pipeline():
    """Initialize and display the TCR pipeline interface"""
    print("TCR Analysis Pipeline")


    pipeline = ColabTCRPipeline()
    pipeline.display_config_interface()

    return pipeline

# Quick start example
def quick_start_example():
    """Run a quick example analysis"""
    print("Quick Start TCR Analysis Example")

    samples_to_analyze = [
        {'id': 'SRR1918236', 'name': 'Patient_1', 'paired': True},
        {'id': 'SRR1918241', 'name': 'Patient_2', 'paired': True}
    ]

    pipeline = ColabTCRPipeline(output_dir="demo_results", threads=2)

    try:
        results, summary = pipeline.run_pipeline(samples_to_analyze)
        print("\nDemo completed successfully!")
        if not summary.empty:
            display(summary)
        return results, summary

    except Exception as e:
        print(f"Demo failed: {str(e)}")
        return None, None

# Entry point
if __name__ == "__main__":
    print(" TCR Analysis Pipeline!")
    print("\nTo start: pipeline = run_tcr_pipeline()")

 TCR Analysis Pipeline!

To start: pipeline = run_tcr_pipeline()
