In [1]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir('/content/drive/MyDrive/BBI_CVD_RECLASS/Gene_Maps')

Mounted at /content/drive


In [25]:
import requests
import json
import pandas as pd
from Bio.Seq import Seq
from Bio.Data import CodonTable
import math
import os

def get_gene_reference_data(gene_symbol, transcript_format='ensembl', specific_transcript=None):
    """
    Get gene transcript ID and reference sequences

    Args:
        gene_symbol: Gene symbol (e.g., 'MSH2', 'BRCA1', 'TP53')
        transcript_format: 'ensembl' or 'refseq'
        specific_transcript: Specific transcript ID to use instead of MANE Select
    """
    server = "https://rest.ensembl.org"

    if specific_transcript:
        print(f"Using specific transcript: {specific_transcript}")
        transcript_id = specific_transcript
    else:
        # Get gene information to find MANE Select
        print(f"Getting {gene_symbol} gene information...")
        ext = f"/lookup/symbol/homo_sapiens/{gene_symbol}?expand=1"

        r = requests.get(server + ext,
                        headers={"Content-Type": "application/json"})

        if r.status_code != 200:
            print(f"Error {r.status_code}: Could not find gene '{gene_symbol}'")
            print("Please check the gene symbol and try again")
            return None

        gene_data = r.json()
        print(f"Found {len(gene_data.get('Transcript', []))} transcripts for {gene_symbol}")

        # Find canonical/MANE Select transcript
        mane_transcript = None
        for transcript in gene_data.get('Transcript', []):
            if transcript.get('is_canonical') == 1:
                mane_transcript = transcript
                break

        if not mane_transcript:
            mane_transcript = gene_data['Transcript'][0]
            print(f"Using first transcript as backup: {mane_transcript['id']}")
        else:
            print(f"Found canonical transcript: {mane_transcript['id']}")

        ensembl_transcript_id = mane_transcript['id']

        # Convert to requested format
        if transcript_format.lower() == 'ensembl':
            transcript_id = ensembl_transcript_id
            print(f"Using Ensembl format: {transcript_id}")
        elif transcript_format.lower() == 'refseq':
            # Get the RefSeq ID
            transcript_details = get_transcript_refseq_id(ensembl_transcript_id)
            if transcript_details and 'refseq_id' in transcript_details:
                transcript_id = transcript_details['refseq_id']
                print(f"Using RefSeq format: {transcript_id}")
            else:
                # Try known RefSeq ID
                known_refseq = get_known_refseq_id(gene_symbol)
                if known_refseq:
                    transcript_id = known_refseq
                    print(f"Using known RefSeq ID: {transcript_id}")
                else:
                    print(f"‚ö†Ô∏è Could not find RefSeq ID for {gene_symbol}")
                    print(f"Using Ensembl ID instead: {ensembl_transcript_id}")
                    transcript_id = ensembl_transcript_id
        else:
            raise ValueError("transcript_format must be 'ensembl' or 'refseq'")

    # Get the actual CDS sequence (use Ensembl ID for API call)
    api_transcript_id = specific_transcript if specific_transcript else ensembl_transcript_id
    print(f"Fetching coding sequence for {api_transcript_id}...")
    ext = f"/sequence/id/{api_transcript_id}?type=cds"

    r = requests.get(server + ext,
                    headers={"Content-Type": "text/plain"})

    if r.status_code == 200:
        cds_sequence = r.text.strip()

        # Translate to get protein sequence
        bio_seq = Seq(cds_sequence)
        protein_sequence = str(bio_seq.translate())

        # Remove stop codon if present
        if protein_sequence.endswith('*'):
            protein_sequence = protein_sequence[:-1]

        return {
            'transcript_id': transcript_id,
            'gene_symbol': gene_symbol,
            'cds_sequence': cds_sequence,
            'protein_sequence': protein_sequence,
            'cds_length': len(cds_sequence),
            'protein_length': len(protein_sequence),
            'format': transcript_format
        }
    else:
        print(f"Error getting sequence: {r.status_code}")
        return None

def get_transcript_refseq_id(ensembl_id):
    """Get RefSeq ID for an Ensembl transcript"""
    server = "https://rest.ensembl.org"
    ext = f"/lookup/id/{ensembl_id}?expand=1"

    try:
        r = requests.get(server + ext, headers={"Content-Type": "application/json"})
        if r.status_code == 200:
            data = r.json()
            if 'DBLinks' in data:
                for db_link in data['DBLinks']:
                    if db_link.get('dbname') == 'RefSeq_mRNA':
                        return {'refseq_id': db_link.get('primary_id')}
        return None
    except:
        return None

def get_known_refseq_id(gene_symbol):
    """Get known RefSeq MANE Select IDs for common genes"""
    known_refseq_ids = {
        'MSH2': 'NM_000251.3', 'BRCA1': 'NM_007294.4', 'BRCA2': 'NM_000059.4',
        'TP53': 'NM_000546.6', 'PTEN': 'NM_000314.8', 'VHL': 'NM_000551.4',
        'MLH1': 'NM_000249.4', 'MSH6': 'NM_000179.3', 'PMS2': 'NM_000535.7',
        'APC': 'NM_000038.6', 'ATM': 'NM_000051.4', 'CHEK2': 'NM_007194.4',
        'PALB2': 'NM_024675.4', 'CDH1': 'NM_004360.5'
    }
    return known_refseq_ids.get(gene_symbol.upper())

def generate_nucleotide_variants(transcript_id, cds_sequence):
    """Generate all possible single nucleotide substitutions"""
    nucleotides = ['A', 'T', 'G', 'C']
    variants = []

    print(f"Generating nucleotide variants from {len(cds_sequence)} bp sequence...")

    for pos in range(len(cds_sequence)):
        original_nt = cds_sequence[pos]

        for new_nt in nucleotides:
            if new_nt != original_nt:
                hgvs_cdna = f"{transcript_id}:c.{pos + 1}{original_nt}>{new_nt}"

                variants.append({
                    'transcript_id': transcript_id,
                    'position': pos + 1,
                    'original_nt': original_nt,
                    'variant_nt': new_nt,
                    'hgvs_cdna': hgvs_cdna,
                    'change_type': 'substitution'
                })

    return variants

def generate_all_codon_variants(transcript_id, protein_sequence):
    """
    Generate variants using ALL 64 possible codons (including synonymous and stop)
    This creates comprehensive codon-level variants for each position
    """
    # All 64 possible codons
    nucleotides = ['A', 'T', 'G', 'C']
    all_codons = []
    for nt1 in nucleotides:
        for nt2 in nucleotides:
            for nt3 in nucleotides:
                all_codons.append(nt1 + nt2 + nt3)

    # Standard genetic code for translation
    standard_table = CodonTable.unambiguous_dna_by_id[1]

    variants = []

    print(f"Generating ALL codon variants from {len(protein_sequence)} aa sequence...")
    print(f"Creating variants with all {len(all_codons)} possible codons at each position...")

    for aa_pos in range(len(protein_sequence)):
        original_aa = protein_sequence[aa_pos]  # Actual reference amino acid

        # Calculate corresponding cDNA positions for this codon
        codon_start = aa_pos * 3 + 1  # 1-based position
        codon_end = codon_start + 2

        # Generate variants using ALL 64 possible codons
        for new_codon in all_codons:
            # Translate the new codon to see what amino acid it codes for
            if new_codon in standard_table.forward_table:
                new_aa = standard_table.forward_table[new_codon]
                variant_type = 'missense' if new_aa != original_aa else 'synonymous'
            elif new_codon in standard_table.stop_codons:
                new_aa = '*'  # Stop codon
                variant_type = 'nonsense' if original_aa != '*' else 'synonymous_stop'
            else:
                continue  # Skip invalid codons (shouldn't happen with standard genetic code)

            # HGVS protein nomenclature
            if new_aa == '*':
                hgvs_protein = f"{transcript_id}:p.{original_aa}{aa_pos + 1}Ter"
            else:
                hgvs_protein = f"{transcript_id}:p.{original_aa}{aa_pos + 1}{new_aa}"

            # HGVS cDNA using delins format
            hgvs_cdna = f"{transcript_id}:c.{codon_start}_{codon_end}delins{new_codon}"

            variants.append({
                'transcript_id': transcript_id,
                'aa_position': aa_pos + 1,  # 1-based position
                'codon_start_pos': codon_start,
                'codon_end_pos': codon_end,
                'original_aa': original_aa,
                'variant_aa': new_aa,
                'variant_codon': new_codon,
                'variant_type': variant_type,  # synonymous, missense, nonsense
                'hgvs_protein': hgvs_protein,
                'hgvs_cdna': hgvs_cdna,
                'change_type': variant_type
            })

    return variants

def create_vep_input_variants(nt_df=None, aa_df=None, transcript_id=None, variant_types='both'):
    """Create VEP input variants from DataFrames"""
    vep_variants = []

    if variant_types in ['nucleotide', 'both'] and nt_df is not None:
        print(f"Adding {len(nt_df):,} nucleotide variants...")
        for _, row in nt_df.iterrows():
            vep_variant = f"{transcript_id}:c.{row['position']}{row['original_nt']}>{row['variant_nt']}"
            vep_variants.append(vep_variant)

    if variant_types in ['codon', 'both'] and aa_df is not None:
        print(f"Adding {len(aa_df):,} codon variants...")
        for _, row in aa_df.iterrows():
            vep_variant = f"{transcript_id}:c.{row['codon_start_pos']}_{row['codon_end_pos']}delins{row['variant_codon']}"
            vep_variants.append(vep_variant)

    return vep_variants

def split_vep_input_file(input_file_path, chunk_size=10000, output_prefix="VEP_chunk"):
    """Split a large VEP input file into smaller chunks"""
    print(f"\nSplitting VEP input file: {input_file_path}")
    print(f"Target chunk size: {chunk_size:,} variants")

    with open(input_file_path, 'r') as f:
        lines = f.readlines()

    variant_lines = [line.strip() for line in lines if line.strip() and not line.startswith('#')]
    total_variants = len(variant_lines)
    num_chunks = math.ceil(total_variants / chunk_size)

    print(f"Total variants: {total_variants:,}")
    print(f"Creating {num_chunks} chunk files...")

    chunk_files = []

    for chunk_num in range(num_chunks):
        start_idx = chunk_num * chunk_size
        end_idx = min((chunk_num + 1) * chunk_size, total_variants)

        chunk_variants = variant_lines[start_idx:end_idx]
        chunk_filename = f"{output_prefix}_{chunk_num + 1:02d}_of_{num_chunks:02d}.txt"

        with open(chunk_filename, 'w') as f:
            for variant in chunk_variants:
                f.write(f"{variant}\n")

        chunk_files.append(chunk_filename)
        print(f"  Created {chunk_filename}: {len(chunk_variants):,} variants")

    print(f"\n‚úÖ Successfully created {len(chunk_files)} chunk files")

    # Create summary file
    summary_filename = f"{output_prefix}_summary.txt"
    with open(summary_filename, 'w') as f:
        f.write(f"VEP Input File Splitting Summary\n")
        f.write(f"================================\n\n")
        f.write(f"Original file: {input_file_path}\n")
        f.write(f"Total variants: {total_variants:,}\n")
        f.write(f"Chunk size: {chunk_size:,}\n")
        f.write(f"Number of chunks: {num_chunks}\n\n")
        f.write(f"Chunk Files:\n")
        for i, filename in enumerate(chunk_files):
            start_idx = i * chunk_size
            end_idx = min((i + 1) * chunk_size, total_variants)
            f.write(f"  {filename}: variants {start_idx + 1:,} - {end_idx:,}\n")

    print(f"Created summary: {summary_filename}")
    return chunk_files, summary_filename

def merge_vep_chunk_results(chunk_result_files, output_filename="merged_VEP_results.txt"):
    """Merge multiple VEP result files back into one"""
    print(f"Merging {len(chunk_result_files)} VEP result files...")

    all_lines = []
    header_written = False

    for i, result_file in enumerate(chunk_result_files):
        print(f"  Processing {result_file}...")

        with open(result_file, 'r') as f:
            lines = f.readlines()

        header_line = None
        data_lines = []

        for line in lines:
            if line.startswith('#Uploaded_variation'):
                header_line = line
            elif not line.startswith('#') and line.strip():
                data_lines.append(line)

        if not header_written and header_line:
            all_lines.append(header_line)
            header_written = True

        all_lines.extend(data_lines)
        print(f"    Added {len(data_lines):,} variants")

    with open(output_filename, 'w') as f:
        f.writelines(all_lines)

    total_variants = len([line for line in all_lines if not line.startswith('#')])
    print(f"\n‚úÖ Created merged file: {output_filename}")
    print(f"üìä Total variants: {total_variants:,}")

    return output_filename

def complete_variant_pipeline(gene_symbol,
                            transcript_format='refseq',
                            variant_types='codon',
                            chunk_size=10000,
                            specific_transcript=None,
                            export_dataframes=True):
    """
    Complete pipeline: Generate variant libraries and create VEP input files

    Args:
        gene_symbol: Gene symbol (e.g., 'MSH2', 'BRCA1', 'TP53')
        transcript_format: 'ensembl' or 'refseq'
        variant_types: 'nucleotide', 'codon', or 'both'
        chunk_size: Number of variants per chunk (None to disable chunking)
        specific_transcript: Specific transcript ID instead of MANE Select
        export_dataframes: Whether to export variant DataFrames to CSV

    Returns:
        Dictionary with results and file paths
    """

    print(f"üöÄ COMPLETE {gene_symbol} VARIANT PIPELINE")
    print("=" * 80)
    print(f"üìç Gene: {gene_symbol.upper()}")
    print(f"üìç Transcript format: {transcript_format.upper()}")
    print(f"üìç Variant types: {variant_types.upper()}")
    if variant_types == 'codon':
        print(f"üìç Codon coverage: ALL 64 codons (synonymous + missense + nonsense)")
    if specific_transcript:
        print(f"üìç Specific transcript: {specific_transcript}")
    else:
        print(f"üìç Transcript: MANE Select (auto-detected)")
    if chunk_size:
        print(f"üìç Chunk size: {chunk_size:,} variants")
    else:
        print(f"üìç Chunking: DISABLED")
    print("=" * 80)

    # Initialize chunk_files to an empty list to prevent UnboundLocalError
    chunk_files = [] # Initialize local chunk_files variable
    summary_file = None

    # Step 1: Get reference data
    print("\nüîç STEP 1: Getting reference data...")
    ref_data = get_gene_reference_data(gene_symbol, transcript_format, specific_transcript)
    if not ref_data:
        return None

    transcript_id = ref_data['transcript_id']
    print(f"\n‚úÖ Reference data retrieved:")
    print(f"   - Transcript: {transcript_id}")
    print(f"   - CDS length: {ref_data['cds_length']} bp")
    print(f"   - Protein length: {ref_data['protein_length']} aa")

    # Step 2: Generate variant DataFrames
    print(f"\nüß¨ STEP 2: Generating variant libraries...")

    nt_df = None
    aa_df = None

    if variant_types in ['nucleotide', 'both']:
        nt_variants = generate_nucleotide_variants(transcript_id, ref_data['cds_sequence'])
        nt_df = pd.DataFrame(nt_variants)
        print(f"‚úÖ Generated {len(nt_variants):,} nucleotide variants")

    if variant_types in ['codon', 'both']:
        aa_variants = generate_all_codon_variants(transcript_id, ref_data['protein_sequence'])
        aa_df = pd.DataFrame(aa_variants)
        print(f"‚úÖ Generated {len(aa_variants):,} codon variants")

        # Show variant type breakdown
        if len(aa_variants) > 0:
            variant_breakdown = aa_df['variant_type'].value_counts()
            print(f"   Breakdown:")
            for vtype, count in variant_breakdown.items():
                print(f"   - {vtype}: {count:,} variants")

    # Step 3: Export DataFrames (optional)
    if export_dataframes:
        print(f"\nüìÅ STEP 3: Exporting variant DataFrames...")
        clean_transcript = transcript_id.replace('.', '_')

        if nt_df is not None:
            nt_filename = f"{gene_symbol}_nucleotide_variants_{clean_transcript}.csv"
            nt_df.to_csv(nt_filename, index=False)
            print(f"   Exported: {nt_filename}")

        if aa_df is not None:
            aa_filename = f"{gene_symbol}_codon_variants_all64_{clean_transcript}.csv"
            aa_df.to_csv(aa_filename, index=False)
            print(f"   Exported: {aa_filename}")

    # Step 4: Create VEP input file
    print(f"\nüéØ STEP 4: Creating VEP input file...")
    vep_variants = create_vep_input_variants(nt_df, aa_df, transcript_id, variant_types)

    if len(vep_variants) == 0:
        print("‚ùå No variants created")
        return None

    # Generate filenames
    clean_transcript = transcript_id.replace('.', '_')
    variant_suffix = variant_types.lower().replace('_', '')
    format_suffix = transcript_format.lower()

    if variant_types == 'codon':
        variant_suffix = 'all64codons'

    if specific_transcript:
        output_prefix = f"{gene_symbol}_{variant_suffix}_VEP_{format_suffix}_{clean_transcript}_custom"
    else:
        output_prefix = f"{gene_symbol}_{variant_suffix}_VEP_{format_suffix}_{clean_transcript}"

    # Create full VEP input file
    full_filename = f"{output_prefix}_full.txt"
    with open(full_filename, 'w') as f:
        for variant in vep_variants:
            f.write(f"{variant}\n")

    print(f"‚úÖ Created VEP input file: {full_filename} ({len(vep_variants):,} variants)")

    results = {
        'gene_symbol': gene_symbol,
        'transcript_id': transcript_id,
        'transcript_format': transcript_format,
        'variant_types': variant_types,
        'specific_transcript': specific_transcript,
        'total_variants': len(vep_variants),
        'nt_df': nt_df,
        'aa_df': aa_df,
        'ref_data': ref_data,
        'full_vep_file': full_filename,
        'chunk_files': [],
        'summary_file': None
    }

    # Step 5: Split into chunks if requested
    if chunk_size and chunk_size > 0:
        print(f"\n‚úÇÔ∏è STEP 5: Splitting into {chunk_size:,} variant chunks...")
        chunk_prefix = f"{output_prefix}_chunk"
        chunk_files, summary_file = split_vep_input_file(full_filename, chunk_size, chunk_prefix)
        results['chunk_files'] = chunk_files
        results['summary_file'] = summary_file

    # Step 6: Show processing instructions
    print(f"\nüìù VEP PROCESSING INSTRUCTIONS:")
    print("=" * 50)

    if chunk_files:
        print(f"1. Upload each chunk file to VEP web interface separately:")
        for chunk_file in chunk_files[:3]:
            print(f"   - {chunk_file}")
        if len(chunk_files) > 3:
            print(f"   - ... and {len(chunk_files) - 3} more")
    else:
        print(f"1. Upload {full_filename} to VEP web interface")

    print(f"2. VEP Settings:")
    print(f"   - Transcript database: {transcript_format.title()}")
    if transcript_format.lower() == 'refseq':
        print(f"   - Select 'RefSeq transcripts' in VEP")
    else:
        print(f"   - Select 'Ensembl transcripts' in VEP (default)")

    if chunk_files:
        print(f"3. Use identical settings for all {len(chunk_files)} chunks")
        print(f"4. Download results as: chunk_01_results.txt, chunk_02_results.txt, etc.")
        print(f"5. Merge results: merge_vep_chunk_results(['chunk_01_results.txt', ...])")
    else:
        print(f"3. Download results when complete")

    # Calculate expected counts
    expected_nt = ref_data['cds_length'] * 3 if variant_types in ['nucleotide', 'both'] else 0
    expected_codon = ref_data['protein_length'] * 64 if variant_types in ['codon', 'both'] else 0

    print(f"\n‚úÖ PIPELINE COMPLETE!")
    print(f"üìä Summary:")
    print(f"   - Gene: {gene_symbol}")
    print(f"   - Transcript: {transcript_id}")
    if variant_types in ['nucleotide', 'both']:
        print(f"   - Nucleotide variants: {len(nt_df) if nt_df is not None else 0:,} (expected: {expected_nt:,})")
    if variant_types in ['codon', 'both']:
        print(f"   - Codon variants: {len(aa_df) if aa_df is not None else 0:,} (expected: {expected_codon:,})")
    print(f"   - Total VEP variants: {len(vep_variants):,}")
    print(f"   - VEP input file: {full_filename}")
    if chunk_files:
        print(f"   - Chunk files: {len(chunk_files)}")

    return results

# Usage examples and help
print("üöÄ COMPLETE VARIANT PIPELINE WITH ALL 64 CODONS READY!")
print("\n" + "=" * 100)
print("USAGE EXAMPLES:")
print("=" * 100)

print("\n1. MSH2 ALL CODONS (RefSeq, 10K chunks) - COMPREHENSIVE:")
print("   results = complete_variant_pipeline('MSH2')")
print("   # Creates synonymous + missense + nonsense variants")

print("\n2. BRCA1 NUCLEOTIDE + ALL CODONS (Ensembl, 5K chunks):")
print("   results = complete_variant_pipeline(")
print("       gene_symbol='BRCA1',")
print("       transcript_format='ensembl',")
print("       variant_types='both',")
print("       chunk_size=5000")
print("   )")

print("\n3. TP53 ALL CODONS ONLY (RefSeq, no chunking):")
print("   results = complete_variant_pipeline(")
print("       gene_symbol='TP53',")
print("       variant_types='codon',")
print("       chunk_size=None")
print("   )")

print("\n4. CUSTOM TRANSCRIPT:")
print("   results = complete_variant_pipeline(")
print("       gene_symbol='PTEN',")
print("       transcript_format='ensembl',")
print("       specific_transcript='ENST00000371953.8'")
print("   )")



üöÄ COMPLETE VARIANT PIPELINE WITH ALL 64 CODONS READY!

USAGE EXAMPLES:

1. MSH2 ALL CODONS (RefSeq, 10K chunks) - COMPREHENSIVE:
   results = complete_variant_pipeline('MSH2')
   # Creates synonymous + missense + nonsense variants

2. BRCA1 NUCLEOTIDE + ALL CODONS (Ensembl, 5K chunks):
   results = complete_variant_pipeline(
       gene_symbol='BRCA1',
       transcript_format='ensembl',
       variant_types='both',
       chunk_size=5000
   )

3. TP53 ALL CODONS ONLY (RefSeq, no chunking):
   results = complete_variant_pipeline(
       gene_symbol='TP53',
       variant_types='codon',
       chunk_size=None
   )

4. CUSTOM TRANSCRIPT:
   results = complete_variant_pipeline(
       gene_symbol='PTEN',
       transcript_format='ensembl',
       specific_transcript='ENST00000371953.8'
   )

KEY FEATURES:
üß¨ ALL 64 CODONS: Includes synonymous, missense, and nonsense variants
üìä VARIANT BREAKDOWN: Shows counts for each variant type
üéØ COMPREHENSIVE COVERAGE: Every possible codon a

In [34]:
results = complete_variant_pipeline(gene_symbol='MSH2', chunk_size=25000, variant_types='codon')

üöÄ COMPLETE MSH2 VARIANT PIPELINE - ALL 64 CODONS!
üìç Gene: MSH2
üìç Transcript format: REFSEQ
üìç Variant types: CODON
üìç Codon coverage: ALL 64 codons (synonymous + missense + nonsense)
üìç Transcript: MANE Select (auto-detected)
üìç Chunk size: 25,000 variants

üîç STEP 1: Getting reference data...
Getting MSH2 gene information...
Found 26 transcripts for MSH2
Found canonical transcript: ENST00000233146
Using known RefSeq ID: NM_000251.3
Fetching coding sequence for ENST00000233146...

‚úÖ Reference data retrieved:
   - Transcript: NM_000251.3
   - CDS length: 2805 bp
   - Protein length: 934 aa

üß¨ STEP 2: Generating variant libraries...
Generating ALL codon variants from 934 aa sequence...
Creating variants with all 64 possible codons at each position...
‚úÖ Generated 59,776 codon variants
   Breakdown:
   - missense: 53,800 variants
   - synonymous: 3,174 variants
   - nonsense: 2,802 variants

üìÅ STEP 3: Exporting variant DataFrames...
   Exported: MSH2_codon_vari

In [4]:
import pandas as pd
import re
from datetime import datetime

def variantvalidator_to_vcf(input_file, output_file, genome_build='hg38'):
    """
    Convert VariantValidator output to VCF format

    Parameters:
    input_file (str): Path to VariantValidator tab-delimited output file
    output_file (str): Path for output VCF file
    genome_build (str): Genome build to use ('hg38' or 'hg37')
    """

    # Read the VariantValidator output, skipping the first two rows
    df = pd.read_csv(input_file, sep='\t', skiprows=2)

    # Select coordinate columns based on genome build
    if genome_build.lower() == 'hg38':
        chr_col = 'GRCh38_CHR'
        pos_col = 'GRCh38_POS'
        ref_col = 'GRCh38_REF'
        alt_col = 'GRCh38_ALT'
        reference = 'GRCh38'
    elif genome_build.lower() == 'hg37':
        chr_col = 'GRCh37_CHR'
        pos_col = 'GRCh37_POS'
        ref_col = 'GRCh37_REF'
        alt_col = 'GRCh37_ALT'
        reference = 'GRCh37'
    else:
        raise ValueError("genome_build must be 'hg38' or 'hg37'")

    # Filter out rows with missing genomic coordinates
    df = df.dropna(subset=[chr_col, pos_col, ref_col, alt_col])

    # Create VCF header
    vcf_header = f"""##fileformat=VCFv4.2
##reference={reference}
##fileDate={datetime.now().strftime('%Y%m%d')}
##source=VariantValidator_to_VCF_converter
##INFO=<ID=GENE,Number=1,Type=String,Description=\"Gene symbol\">
##INFO=<ID=HGVS_C,Number=1,Type=String,Description=\"HGVS cDNA notation\">
##INFO=<ID=HGVS_P,Number=1,Type=String,Description=\"HGVS protein notation\">
##INFO=<ID=TRANSCRIPT,Number=1,Type=String,Description=\"Reference transcript\">
##INFO=<ID=HGNC_ID,Number=1,Type=String,Description=\"HGNC Gene ID\">
##INFO=<ID=TRANSCRIPT_DESC,Number=1,Type=String,Description=\"Transcript description\">"""

    # Add contig information for chromosomes present in data
    chromosomes = df[chr_col].dropna().unique()
    for chrom in sorted(chromosomes):
        if chrom == 'X':
            length = '156040895' if genome_build.lower() == 'hg38' else '155270560'
        elif chrom == 'Y':
            length = '57227415' if genome_build.lower() == 'hg38' else '59373566'
        elif chrom == 'MT' or chrom == 'M':
            length = '16569'
        else:
            # Approximate lengths for autosomes (you may want to use exact values)
            chr_lengths_hg38 = {
                '1': '248956422', '2': '242193529', '3': '198295559', '4': '190214555',
                '5': '181538259', '6': '170805979', '7': '159345973', '8': '145138636',
                '9': '138394717', '10': '133797422', '11': '135086622', '12': '133275309',
                '13': '114364328', '14': '107043718', '15': '101991189', '16': '90338345',
                '17': '83257441', '18': '80373285', '19': '58617616', '20': '64444167',
                '21': '46709983', '22': '50818468'
            }
            chr_lengths_hg37 = {
                '1': '249250621', '2': '242193529', '3': '198022430', '4': '191154276',
                '5': '180915260', '6': '171115067', '7': '159138663', '8': '146364022',
                '9': '141213431', '10': '135534747', '11': '135006516', '12': '133851895',
                '13': '115169878', '14': '107349540', '15': '102531392', '16': '90354753',
                '17': '81195210', '18': '78077248', '19': '59128983', '20': '63025520',
                '21': '48129895', '22': '51304566'
            }

            if genome_build.lower() == 'hg38':
                length = chr_lengths_hg38.get(str(chrom), '100000000')
            else:
                length = chr_lengths_hg37.get(str(chrom), '100000000')

        vcf_header += f"\n##contig=<ID={chrom},length={length}>"

    # Add column header
    vcf_header += "\n#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO"

    # Process each variant
    vcf_records = []

    for idx, row in df.iterrows():
        # Extract basic variant information
        chrom = str(row[chr_col])
        pos = str(int(row[pos_col]))
        ref = row[ref_col]
        alt = row[alt_col]

        # Create INFO field
        info_parts = []

        if pd.notna(row.get('Gene_Symbol')):
            info_parts.append(f"GENE={row['Gene_Symbol']}")

        if pd.notna(row.get('HGVS_transcript')):
            # Clean up HGVS notation by removing any warnings/notes
            hgvs_c = row['HGVS_transcript']
            info_parts.append(f"HGVS_C={hgvs_c}")

            # Extract transcript ID from HGVS notation
            transcript_match = re.match(r'([^:]+):', hgvs_c)
            if transcript_match:
                info_parts.append(f"TRANSCRIPT={transcript_match.group(1)}")

        if pd.notna(row.get('HGVS_Predicted_Protein')):
            info_parts.append(f"HGVS_P={row['HGVS_Predicted_Protein']}")

        if pd.notna(row.get('HGNC_Gene_ID')):
            info_parts.append(f"HGNC_ID={row['HGNC_Gene_ID']}")

        if pd.notna(row.get('Transcript_description')):
            # Clean up description and remove spaces/special characters
            desc = str(row['Transcript_description']).replace(' ', '_').replace(',', ';')
            info_parts.append(f"TRANSCRIPT_DESC={desc}")

        info_field = ';'.join(info_parts) if info_parts else '.'

        # Create VCF record
        vcf_record = f"{chrom}\t{pos}\t.\t{ref}\t{alt}\t.\tPASS\t{info_field}"
        vcf_records.append(vcf_record)

    # Write VCF file
    with open(output_file, 'w') as f:
        f.write(vcf_header + '\n')
        for record in vcf_records:
            f.write(record + '\n')

    print(f"VCF file created: {output_file}")
    print(f"Number of variants: {len(vcf_records)}")
    print(f"Genome build: {reference}")



In [6]:
print("### Step 2:VEP input file into variant validator batch annotator https://variantvalidator.org/service/validate/batch/")

### Step 2: paste VCF file into variant validator batch annotator https://variantvalidator.org/service/validate/batch/


In [7]:
print("### Step 3: Convert VariantValidator output to VCF")

# Call the function to convert VariantValidator output to VCF
variantvalidator_to_vcf('vv_MSH2_1_of_3.txt', 'MSH2_1_of_3.vcf', genome_build='hg38')
variantvalidator_to_vcf('vv_MSH2_2_of_3.txt', 'MSH2_2_of_3.vcf', genome_build='hg38')
variantvalidator_to_vcf('vv_MSH2_3_of_3.txt', 'MSH2_3_of_3.vcf', genome_build='hg38')

### Step 3: Convert VariantValidator output to VCF
VCF file created: MSH2_1_of_3.vcf
Number of variants: 25000
Genome build: GRCh38
VCF file created: MSH2_2_of_3.vcf
Number of variants: 25000
Genome build: GRCh38
VCF file created: MSH2_3_of_3.vcf
Number of variants: 9776
Genome build: GRCh38


In [None]:
print("### Step 4: upload VCF file to Open CRAVAT" https://run.opencravat.org/submit/nocache/index.html with REVEL, EVE, AlphaMissense, BayesDel, MutPred, ClinVar, Gnomad4, Gnomad3, Gnomad)

In [7]:
import pandas as pd
import re
from collections import Counter

def parse_cravat_tsv(file_path):
    """
    Parses a CRAVAT TSV file with a two-level header, combines headers,
    cleans column names, and loads the data into a pandas DataFrame.

    Args:
        file_path (str): The path to the CRAVAT TSV file.

    Returns:
        pd.DataFrame: A DataFrame with cleaned and combined column names.
    """

    # --- Step 1: Read the header lines to process them ---
    # Assuming 192 is the maximum number of columns, based on previous analysis.
    # Create a list of placeholder names to force read_csv to parse all columns.
    num_expected_cols = 192 # This value might need adjustment if the actual file has more or fewer columns
    placeholder_names = [f'col_{i}' for i in range(num_expected_cols)]

    # Read the actual first header line (grouping columns), assuming it's the 6th line (index 5) in the file
    header_line1_raw = pd.read_csv(file_path, sep='\t', skiprows=5, nrows=1, header=None, names=placeholder_names).iloc[0]

    # Read the actual second header line (specific column names), assuming it's the 7th line (index 6) in the file
    header_line2_raw = pd.read_csv(file_path, sep='\t', skiprows=6, nrows=1, header=None, names=placeholder_names).iloc[0]

    # --- Step 2: Combine header lines into a single, flat list of column names ---
    combined_columns = []
    # Initialize last_major_col to handle cases where initial columns might not have a grouping header
    last_major_col = ""

    # Ensure both headers are lists/series of comparable length; fill NaN values in the first header for consistent processing
    # header_line1_filled will now have the same length as header_line2_raw because of 'names' parameter
    header_line1_filled = header_line1_raw.ffill().fillna('').astype(str)

    for i in range(len(header_line2_raw)): # Iterate through the more detailed second header line
        major_col_val = header_line1_filled.iloc[i].strip()
        minor_col_val = header_line2_raw.iloc[i]

        # Update last_major_col if the current major_col_val is not empty
        if major_col_val:
            last_major_col = major_col_val

        # Combine major and minor column names
        if pd.isna(minor_col_val): # If the minor column is NaN, use the last major group
            if last_major_col:
                combined_columns.append(last_major_col)
            elif major_col_val: # Fallback if last_major_col was empty but current major_col has a value
                combined_columns.append(major_col_val)
            else:
                combined_columns.append(f"Unnamed_Col_{i}") # Fallback for truly empty/unnamed columns
        else:
            minor_col_str = str(minor_col_val).strip()
            # If a major group exists and the minor column isn't redundant, combine them
            if last_major_col and minor_col_str != last_major_col and not minor_col_str.startswith(last_major_col + '_'):
                combined_columns.append(f"{last_major_col}_{minor_col_str}")
            else:
                combined_columns.append(minor_col_str)

    # Clean up column names: remove leading/trailing spaces, replace problematic characters
    cleaned_columns = [col.strip().replace(' ', '_').replace('.', '').replace('/', '_').replace('#', '') for col in combined_columns]

    # Handle duplicate column names by appending a suffix (e.g., 'col' and 'col_1')
    name_counts = {}
    final_columns = []

    for col in cleaned_columns:
        base_col = col # The original cleaned name
        count = name_counts.get(base_col, 0) # Get current count for this base name

        # Keep trying new names until a unique one is found
        new_col_name = base_col
        while new_col_name in final_columns:
            count += 1
            new_col_name = f"{base_col}_{count}"

        final_columns.append(new_col_name)
        name_counts[base_col] = count # Update the count for the base name


    # --- Step 3: Load the data with the new column names ---
    # Skip the first 7 rows (5 metadata + 2 header lines) and use the combined column names
    cravat_df = pd.read_csv(file_path, sep='\t', skiprows=7, header=None, names=final_columns, low_memory=False)

    return cravat_df

# Call the new function with the CRAVAT file
cravat_df = parse_cravat_tsv('CRAVAT_MSH2.tsv')

# Display the first 5 rows of the DataFrame
print("First 5 rows of cravat_df using the new function:")
print(cravat_df.head())

# Display the column names
print("\nColumn names of cravat_df using the new function:")
print(cravat_df.columns)


First 5 rows of cravat_df using the new function:
   Variant_Annotation_UID Variant_Annotation_Chrom  \
0                   24803                     chr2   
1                   46763                     chr2   
2                   19586                     chr2   
3                   31243                     chr2   
4                   40485                     chr2   

   Variant_Annotation_Position Variant_Annotation_Ref_Base  \
0                     47478397                           T   
1                     47410289                           G   
2                     47476450                           T   
3                     47482781                           A   
4                     47408458                           A   

  Variant_Annotation_Alt_Base  Variant_Annotation_Variant_Note  \
0                           G                              NaN   
1                           T                              NaN   
2                           G                         

In [9]:
output_tsv_filename = 'cravat_msh2_cleaned.tsv'
cravat_df.to_csv(output_tsv_filename, sep='\t', index=False)

print(f"DataFrame saved to '{output_tsv_filename}'")

DataFrame saved to 'cravat_msh2_cleaned.tsv'
