<a href="https://colab.research.google.com/github/angusli98/ColabBE/blob/main/ColabBE_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ColabBE: automated base editing analysis for ClinVar variants

Written by Angus Li with contributions from Alvin Hsu

Editing predictions based on BE-Hive by Max W. Shen (https://www.crisprbehive.design/), an implementation of machine learning models for base editing outcomes described in:

**Arbab M**, **Shen MW**, Mok B, et al. Determinants of Base Editing Outcomes from Target Library Analysis and Machine Learning. *Cell*. 2020;182(2):463-480.e30. doi:10.1016/j.cell.2020.05.037

Cas9 variant recommendations based on HT-PAMDA data courtesy of Rachel A. Silverstein and Benjamin P. Kleinstiver, as described in:

**Silverstein RA**, Kim N, Kroell AS, et al. Custom CRISPR-Cas9 PAM variants via scalable engineering and machine learning. *Nature*. Published online April 22, 2025. doi:10.1038/s41586-025-09021-y

For coding mutations, ColabBE analyzes base editing strategies that are expected to correct the mutated codon to any codon for the reference amino acid. For non-coding mutations, ColabBE analyzes base editing strategies that are expected to revert the mutated base to the reference sequence.

Source code is available under the [GNU Affero General Public License, version 3](https://www.gnu.org/licenses/agpl-3.0.en.html) from this notebook or at https://github.com/angusli98/ColabBE.

**Instructions**:

- To run, make a copy of this notebook in your own Google Drive (File > Save a copy in Drive). Use this copy for analysis.
- First run the section titled "Mount Google Drive, install/import libraries and define functions" by clicking the "Play" button to the left.
- The optional "Search ClinVar for ClinVar IDs" section can be used to find ClinVar IDs of interest using more general search terms.
- Specify all parameters under "Perform analysis and download results," then start analysis by clicking the "Play" button to the left. A CPU runtime is sufficient.
- After analysis is finished, it is recommended to disconnect and delete the runtime (menu in the top right corner) to avoid wasting resources.

**Results interpretation**:

- When analysis is finished, a zip archive is provided for download, containing all output files. There is a known issue decompressing some files with Windows Explorer; archives can be fully unpacked using [7-Zip](https://www.7-zip.org/) in such cases.
- For each ClinVar ID with potential corrective strategies, two files are provided: `...predicted base editing statistics.xlsx` and `...predicted sequence distributions.xlsx`.
-  `...predicted base editing statistics.xlsx` contains the editor, 20-nucleotide spacer, and 4-nucleotide PAM for each analyzed base editing strategy, as well as the fraction of predicted perfect correction, Z-score, percentile, and predicted target efficiency given the specified `average_editing_efficiency`. For coding mutations, perfect correction is defined as the fraction of **edited** sequencing reads for which the corresponding translated protein sequence is reverted to the same as the reference sequence. For non-coding mutations, perfect correction is defined as the fraction of **edited** sequencing reads for which the DNA sequence exactly matches the reference sequence. Cas9 variant recommendations and corresponding HT-PAMDA scores are also provided based on the PAM sequence.
- `...predicted sequence distributions.xlsx` contains the predicted fraction of sequences resulting from each analyzed editing strategy. Each editor, spacer, and PAM combination is a separate sheet in the file.
- A file named `Summary of editor candidates above threshold.xlsx` is also provided. This is a listing of all editor candidates, with their corresponding ClinVar IDs and variant names, with predicted perfect correction above the cutoff specified in `threshold`.
- A file named `Variants with editor candidates above threshold.xlsx` is provided. This lists all analyzed variants with at least one editor candidate with predicted perfect correction above the cutoff specified in `threshold`.
- All output files are also stored in a timestamped folder on your Google Drive. This can be deleted after analysis is finished if space is limited.

In [1]:
# @title (Required) Mount Google Drive, install/import libraries and define functions { display-mode: "form" }
!pip install biopython
import requests
from Bio import Seq
from Bio import Entrez
import io
import pandas as pd
from datetime import datetime
import os
from google.colab import drive
from google.colab import files
import shutil
import re
drive.mount('/content/gdrive')
from google.colab import data_table
data_table.enable_dataframe_formatter()
!wget https://github.com/angusli98/ColabBE/raw/refs/heads/main/PAM_scores.xlsx
pam_dict = pd.read_excel('PAM_scores.xlsx', index_col = 'PAM').to_dict(orient='index')
from contextlib import contextmanager
import pickle as pkl
import numpy as np
from sklearn.dummy import DummyRegressor
from sklearn._loss import HalfSquaredError
from sklearn.tree._tree import Tree as SKTree
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import GradientBoostingRegressor
import contextlib
!git clone https://github.com/maxwshen/be_predict_bystander.git
!git clone https://github.com/maxwshen/be_predict_efficiency.git
import sys
sys.path.append('/content/be_predict_bystander')
sys.path.append('/content/be_predict_efficiency')
from be_predict_efficiency import predict as be_efficiency_model
from be_predict_bystander import predict as bystander_model
from scipy.special import logit,expit
from scipy.stats import norm

class _PatchedTree(SKTree):
    def __setstate__(self, state):
        nodes = state.get("nodes")
        if nodes is not None and nodes.dtype.names and 'missing_go_to_left' not in nodes.dtype.names:
            expected_dtype = np.dtype([
                ('left_child', '<i8'), ('right_child', '<i8'), ('feature', '<i8'),
                ('threshold', '<f8'), ('impurity', '<f8'), ('n_node_samples', '<i8'),
                ('weighted_n_node_samples', '<f8'), ('missing_go_to_left', 'u1'),
            ])
            fixed_nodes = np.zeros(nodes.shape[0], dtype=expected_dtype)
            for field in nodes.dtype.names:
                fixed_nodes[field] = nodes[field]
            fixed_nodes['missing_go_to_left'] = True
            state['nodes'] = fixed_nodes
        super().__setstate__(state)


class _FixUnpickler(pkl.Unpickler):
    def find_class(self, module, name):
        if module == 'sklearn.ensemble.gradient_boosting':
            module = 'sklearn.ensemble'
        elif module == 'sklearn.tree.tree':
            module = 'sklearn.tree'
        if module == 'sklearn.ensemble' and name == 'LeastSquaresError':
            return HalfSquaredError
        elif module == 'sklearn.ensemble' and name == 'MeanEstimator':
            return DummyRegressor
        elif module == 'sklearn.tree._tree' and name == 'Tree':
            return _PatchedTree
        return super().find_class(module, name)

    def load(self):
        obj = super().load()
        if isinstance(obj, GradientBoostingRegressor):
            estimators = getattr(obj, 'estimators_', None)
            if estimators is not None:
                for tree in estimators.ravel():
                    if not hasattr(tree, 'monotonic_cst'):
                        tree.monotonic_cst = np.array([], dtype=np.int8)
            if not hasattr(obj, '_loss'):
                obj._loss = HalfSquaredError()
            if hasattr(obj, 'init_') and isinstance(obj.init_, DummyRegressor) and \
                    not hasattr(obj.init_, 'strategy') and hasattr(obj.init_, 'mean'):
                mean = obj.init_.mean
                obj.init_.strategy = 'constant'
                obj.init_.constant_ = np.array([[mean]])
                obj.init_.n_outputs_ = 1
        return obj


@contextmanager
def patch_pickle():
    import pickle
    _orig = pickle.load
    pickle.load = lambda f: _FixUnpickler(f).load()
    yield
    pickle.load = _orig

def cvIDtoHg38Coords(cvID, email):
    ASM_MAP = {"NCBI36": "hg18", "GRCh37": "hg19", "GRCh38": "hg38", "T2T-CHM13v2.0": "hs1"}
    Entrez.email = email
    records = Entrez.read(Entrez.esummary(db='clinvar', id=cvId), validate = False)
    try:
      varData = records['DocumentSummarySet']['DocumentSummary'][0]['variation_set'][0]
      spdi = varData["canonical_spdi"]
    except:
      raise RuntimeError(f'Invalid ClinVar ID: {cvID}')
    try:
       [start, ref, mut] = spdi.split(":")[1:]
    except:
      raise RuntimeError("No coordinates available")
    coord = [entry for entry in varData["variation_loc"] if entry['status'] == 'current'][0]
    [chrom, assembly_name] = [coord['chr'], coord['assembly_name']]
    return {'coords': {'assembly': ASM_MAP[assembly_name],
                                  'chrom': "chr" + chrom,
                                  'pos': start },
                        'alleles': { 'vName': varData["variation_name"],
                                   'ref': ref,
                                   'mut': mut },
                        'gene': records['DocumentSummarySet']['DocumentSummary'][0]['gene_sort']}

def coordsToRefSeq(coords):
    if 'start' in coords and 'end' in coords:
        start = coords['start']
        end = coords['end']
    else:
        start = int(coords['pos'])
        end = start + 1
    query = {'track': 'ccdsGene', 'genome': coords['assembly'], 'chrom': coords['chrom'],
                 'start': start, 'end': end}
    resp = requests.get("https://api.genome.ucsc.edu/getData/track", params = query)
    if not resp.ok: raise RuntimeError(f'Failed to get RefSeq annotations from UCSC: {resp.status_code} {resp.text}')
    data = resp.json()
    if data['ccdsGene']: return data['ccdsGene'][0]
    return None

def fetchSequenceFromCoords(coords, contextLen):
    [assembly, chrom, pos] = [coords['assembly'], coords['chrom'], int(coords['pos'])]
    startPos = pos - contextLen
    endPos = pos + contextLen
    query = {'genome': assembly, 'chrom': chrom, 'start': startPos, 'end': endPos}
    resp = requests.get('https://api.genome.ucsc.edu/getData/sequence', params = query)
    if not resp.ok: raise RuntimeError('Failed to fetch sequence from UCSC Genome Browser')
    data = resp.json()
    return data['dna']

def getContextExonTranslations(geneData, target, contextLen):
    target = int(target)
    contextStart = target - contextLen
    contextEnd = target + contextLen
    exonStarts = [int(n) for n in geneData["exonStarts"].split(',') if n.isdigit() and int(n) != 0]
    exonEnds = [int(n) for n in geneData["exonEnds"].split(',') if n.isdigit() and int(n) != 0]
    exonFrames = [int(n) for n in geneData["exonFrames"].split(',') if n.isdigit()]
    if geneData['strand'] == '+':
        exonStarts[0] = geneData['cdsStart']
        exonEnds[-1] = geneData['cdsEnd']
        exonFrames[0] = 0
    else:
        exonStarts[0] = geneData['cdsStart']
        exonEnds[-1] = geneData['cdsEnd']
        exonFrames[-1] = 0
    contentExons = []
    for i in range(len(exonStarts)):
        if exonStarts[i] <= contextEnd and exonEnds[i] >= contextStart:
            exonNumber = i + 1 if geneData['strand'] == '+' else len(exonStarts) - i
            startOffset = max(0, exonStarts[i] - contextStart)
            endOffset = min(exonEnds[i], contextEnd) - contextStart
            adjustedFrame = exonFrames[i]
            if exonStarts[i] < contextStart and geneData['strand'] == '+':
                distanceFromContextStart = contextStart - exonStarts[i]
                adjustedFrame = (exonFrames[i] + distanceFromContextStart) % 3
            elif exonEnds[i] > contextEnd and geneData['strand'] == '-':
                distanceFromContextStart = exonEnds[i] - contextEnd
                adjustedFrame = (exonFrames[i] + distanceFromContextStart) % 3
            contentExons.append({'name': f'{geneData["name2"]} Exon {exonNumber}',
                                 'start': startOffset, 'end': endOffset, 'direction': geneData['strand'],
                                 'frame': (3 - adjustedFrame) % 3})
    return contentExons

def testReversion(mutCodon, wtAA, verbose = True):
    candidates = []
    abe_plus = mutCodon.replace('A', 'G')
    if Seq.translate(abe_plus) == wtAA:
        if verbose: print(f'{mutCodon} could be corrected to {abe_plus}')
        candidates.append(['ABE', '+'])
    cbe_plus = mutCodon.replace('C', 'T')
    if Seq.translate(cbe_plus) == wtAA:
        if verbose: print(f'{mutCodon} could be corrected to {cbe_plus}')
        candidates.append(['BE4', '+'])
    abe_minus = mutCodon.replace('T', 'C')
    if Seq.translate(abe_minus) == wtAA:
        if verbose: print(f'{mutCodon} could be corrected to {abe_minus}')
        candidates.append(['ABE', '-'])
    cbe_minus = mutCodon.replace('G', 'A')
    if Seq.translate(cbe_minus) == wtAA:
        if verbose: print(f'{mutCodon} could be corrected to {cbe_minus}')
        candidates.append(['BE4', '-'])
    return candidates

def isTransition(allele, verbose = True):
    if allele['mut'] == 'A' and allele['ref'] == 'G':
        if verbose: print('A could be corrected to G')
        return ['ABE', '+']
    elif allele['mut'] == 'C' and allele['ref'] == 'T':
        if verbose: print('C could be corrected to T')
        return ['BE4', '+']
    elif allele['mut'] == 'T' and allele['ref'] == 'C':
        if verbose: print('T could be corrected to C')
        return ['ABE', '-']
    elif allele['mut'] == 'G' and allele['ref'] == 'A':
        if verbose: print('G could be corrected to A')
        return ['BE4', '-']
    return False

def behive(seq50,pred_efficiency=0.5):
    with contextlib.redirect_stdout(None):
      with contextlib.redirect_stderr(None):
        pred_d = be_efficiency_model.predict(seq50)
        z = pred_d['Predicted logit score']
        pred_real = expit(1.5 * z + logit(pred_efficiency)) * 100
        pred_df, stats = bystander_model.predict(seq50)
        pred_df = bystander_model.add_genotype_column(pred_df, stats)
        percentile = norm.cdf(z) * 100
    return {'df':pred_df, 'z':z, 'percentile':percentile, 'target_efficiency':pred_real}

def analyze(cvId, email, verbose = True, pred_efficiency = 0.5):
    contextLen = 35
    results = {}
    analysis = {}
    stats_dict = []
    a = cvIDtoHg38Coords(cvId, email)
    vname = a['alleles']['vName']
    print(f'ClinVar ID: {cvId}')
    print(f'Variant name: {vname}')
    if len(a['alleles']['ref']) != 1 or len(a['alleles']['mut']) != 1:
        print(f'The mutation with ClinVar ID {cvId} is not a single nucleotide variant.')
        return[analysis, stats_dict, vname]
    elif a['alleles']['ref'] == a['alleles']['mut']:
        print(f'The mutation with ClinVar ID {cvId} has no base change to correct.')
        return[analysis, stats_dict, vname]
    geneData = coordsToRefSeq(a['coords'])
    seq = fetchSequenceFromCoords(a['coords'], contextLen)
    seq = seq.upper()
    mut = seq[:contextLen] + a['alleles']['mut'] + seq[contextLen+1:]
    if geneData: c = getContextExonTranslations(geneData, a['coords']['pos'], contextLen)
    else: c = {}
    correctExon = None
    for exon in c:
        seqSlice = seq[exon['start']:exon['end']]
        mutSlice = mut[exon['start']:exon['end']]
        if exon['direction'] == '-':
            seqSlice = Seq.reverse_complement(seqSlice)
            mutSlice = Seq.reverse_complement(mutSlice)
        seqTranslation = Seq.translate(seqSlice[exon['frame']:len(seqSlice)-((len(seqSlice)-exon['frame'])%3)])
        mutTranslation = Seq.translate(mutSlice[exon['frame']:len(seqSlice)-((len(seqSlice)-exon['frame'])%3)])
        if verbose:
            print('WT:')
            print(seqSlice)
            print(' ' * exon['frame'] + ''.join([' ' + seqTranslation[aa] + ' ' for aa in range(len(seqTranslation))]))
            print(Seq.complement(seqSlice))
            print('Variant:')
            print(mutSlice)
            print(' ' * exon['frame'] + ''.join([' ' + mutTranslation[aa] + ' ' for aa in range(len(mutTranslation))]))
            print(Seq.complement(mutSlice))
        if contextLen < exon['end'] and contextLen >= exon['start']:
            correctExon = exon
            if exon['direction'] == '+':
                pos = contextLen - exon['start']
            elif exon['direction'] == '-':
                pos = (exon['end'] - 1) - contextLen
            wtCodon = seqSlice[pos - ((pos - exon['frame']) % 3):pos - ((pos - exon['frame']) % 3) + 3]
            mutCodon = mutSlice[pos - ((pos - exon['frame']) % 3):pos - ((pos - exon['frame']) % 3) + 3]
            if len(wtCodon) == 3 and len(mutCodon) == 3:
                wtAA = Seq.translate(wtCodon)
                mutAA = Seq.translate(mutCodon)
                if verbose: print(f'The wild-type codon {wtCodon}, coding for {wtAA}, is mutated to {mutCodon}, coding for {mutAA}')
            else:
                correctExon = None
    if correctExon:
        candidates = testReversion(mutCodon, wtAA, verbose)
        if candidates:
            for candidate in candidates:
                [editor, strand] = candidate
                with contextlib.redirect_stdout(None):
                    with contextlib.redirect_stderr(None):
                        with patch_pickle():
                            be_efficiency_model.init_model(base_editor=editor, celltype='mES')
                            bystander_model.init_model(base_editor=editor, celltype='mES')
                if strand == correctExon['direction']:
                    for start in range(17):
                        seq50 = mut[start:start+50]
                        if verbose: print(f'Sending {seq50} (top strand) to BE-Hive with editor {editor}')
                        results[(cvId, start, editor, strand)] = behive(seq50, pred_efficiency)
                else:
                    mut_rc = Seq.reverse_complement(mut)
                    for start in range(17):
                        seq50 = mut_rc[start:start+50]
                        if verbose: print(f'Sending {seq50} (bottom strand) to BE-Hive with editor {editor}')
                        results[(cvId, start, editor, strand)] = behive(seq50, pred_efficiency)
            seqSlice = seq[correctExon['start']:correctExon['end']]
            if correctExon['direction'] == '-':
                seqSlice = Seq.reverse_complement(seqSlice)
            seqTranslation = Seq.translate(seqSlice[correctExon['frame']:len(seqSlice)-((len(seqSlice)-correctExon['frame'])%3)])
            for entry in results:
                (cvId, start, editor, strand) = entry
                df = results[entry]['df']
                z = results[entry]['z']
                percentile = results[entry]['percentile']
                target_efficiency = results[entry]['target_efficiency']
                perfect_correction = 0
                if strand == correctExon['direction']:
                    spacer = mut[start+20:start+40]
                    pam = mut[start+40:start+44]
                else:
                    mut_rc = Seq.reverse_complement(mut)
                    spacer = mut_rc[start+20:start+40]
                    pam = mut_rc[start+40:start+44]
                params = (editor, spacer, pam)
                analysis[params] = {}
                for p, genotype in zip(df['Predicted frequency'], df['Genotype']):
                    if strand == correctExon['direction']:
                        mut_edited = mut[:start] + genotype + mut[start+50:]
                    else:
                        mut_rc_edited = mut_rc[:start] + genotype + mut_rc[start+50:]
                        mut_edited = Seq.reverse_complement(mut_rc_edited)
                    mutSlice = mut_edited[correctExon['start']:correctExon['end']]
                    if correctExon['direction'] == '-': mutSlice = Seq.reverse_complement(mutSlice)
                    mutTranslation = Seq.translate(mutSlice[correctExon['frame']:len(seqSlice)-((len(seqSlice)-correctExon['frame'])%3)])
                    if seqTranslation == mutTranslation:
                        perfect_correction += p
                    if mutTranslation not in analysis[params]: analysis[params][mutTranslation] = 0
                    analysis[params][mutTranslation] += p
                stats_dict.append({'ClinVar ID': cvId, 'Variant name': vname, 'Editor': editor, 'Spacer': spacer, 'PAM': pam, 'Perfect correction': perfect_correction,
                                     'Z-score': z, 'Percentile': percentile, 'Target efficiency': target_efficiency})
        else: print(f'The coding mutation with ClinVar ID {cvId} is unlikely to be correctable via canonical base editing. Try prime editing instead.')
    else:
        transition = isTransition(a['alleles'])
        if transition:
            [editor, strand] = transition
            with contextlib.redirect_stdout(None):
                with contextlib.redirect_stderr(None):
                    with patch_pickle():
                        be_efficiency_model.init_model(base_editor=editor, celltype='mES')
                        bystander_model.init_model(base_editor=editor, celltype='mES')
            if strand == '+':
                for start in range(17):
                    seq50 = mut[start:start+50]
                    if verbose: print(f'Sending {seq50} (top strand) to BE-Hive with editor {editor}')
                    results[(cvId, start, editor, strand)] = behive(seq50, pred_efficiency)
            elif strand == '-':
                for start in range(17):
                    seq50 = Seq.reverse_complement(mut)[start:start+50]
                    if verbose: print(f'Sending {seq50} (bottom strand) to BE-Hive with editor {editor}')
                    results[(cvId, start, editor, strand)] = behive(seq50, pred_efficiency)
            for entry in results:
                (cvId, start, editor, strand) = entry
                df = results[entry]['df']
                z = results[entry]['z']
                percentile = results[entry]['percentile']
                target_efficiency = results[entry]['target_efficiency']
                perfect_correction = 0
                if strand == '+':
                    spacer = mut[start+20:start+40]
                    pam = mut[start+40:start+44]
                elif strand == '-':
                    mut_rc = Seq.reverse_complement(mut)
                    spacer = mut_rc[start+20:start+40]
                    pam = mut_rc[start+40:start+44]
                params = (editor, spacer, pam)
                analysis[params] = {}
                for p, genotype in zip(df['Predicted frequency'], df['Genotype']):
                    if strand == '+':
                        mut_edited = mut[:start] + genotype + mut[start+50:]
                    elif strand == '-':
                        mut_rc_edited = mut_rc[:start] + genotype + mut_rc[start+50:]
                        mut_edited = Seq.reverse_complement(mut_rc_edited)
                    if seq == mut_edited:
                        perfect_correction += p
                    if mut_edited not in analysis[params]: analysis[params][mut_edited] = 0
                    analysis[params][mut_edited] += p
                stats_dict.append({'ClinVar ID': cvId, 'Variant name': vname, 'Editor': editor, 'Spacer': spacer, 'PAM': pam, 'Perfect correction': perfect_correction,
                                   'Z-score': z, 'Percentile': percentile, 'Target efficiency': target_efficiency})
        else: print(f'The non-coding mutation with ClinVar ID {cvId} is not accesible to canonical base editing. Try prime editing instead.')
    print('Done')
    return [analysis, stats_dict, vname]

def append(
    self,
    other,
    ignore_index: bool = False,
    verify_integrity: bool = False,
    sort: bool = False,
) -> pd.DataFrame:
    """
    Append rows of `other` to the end of caller, returning a new object.

    .. deprecated:: 1.4.0
        Use :func:`concat` instead. For further details see
        :ref:`whatsnew_140.deprecations.frame_series_append`

    Columns in `other` that are not in the caller are added as new columns.

    Parameters
    ----------
    other : DataFrame or Series/dict-like object, or list of these
        The data to append.
    ignore_index : bool, default False
        If True, the resulting axis will be labeled 0, 1, …, n - 1.
    verify_integrity : bool, default False
        If True, raise ValueError on creating index with duplicates.
    sort : bool, default False
        Sort columns if the columns of `self` and `other` are not aligned.

        .. versionchanged:: 1.0.0

            Changed to not sort by default.

    Returns
    -------
    DataFrame
        A new DataFrame consisting of the rows of caller and the rows of `other`.

    See Also
    --------
    concat : General function to concatenate DataFrame or Series objects.

    Notes
    -----
    If a list of dict/series is passed and the keys are all contained in
    the DataFrame's index, the order of the columns in the resulting
    DataFrame will be unchanged.

    Iteratively appending rows to a DataFrame can be more computationally
    intensive than a single concatenate. A better solution is to append
    those rows to a list and then concatenate the list with the original
    DataFrame all at once.

    Examples
    --------
    >>> df = pd.DataFrame([[1, 2], [3, 4]], columns=list('AB'), index=['x', 'y'])
    >>> df
        A  B
    x  1  2
    y  3  4
    >>> df2 = pd.DataFrame([[5, 6], [7, 8]], columns=list('AB'), index=['x', 'y'])
    >>> df.append(df2)
        A  B
    x  1  2
    y  3  4
    x  5  6
    y  7  8

    With `ignore_index` set to True:

    >>> df.append(df2, ignore_index=True)
        A  B
    0  1  2
    1  3  4
    2  5  6
    3  7  8

    The following, while not recommended methods for generating DataFrames,
    show two ways to generate a DataFrame from multiple data sources.

    Less efficient:

    >>> df = pd.DataFrame(columns=['A'])
    >>> for i in range(5):
    ...     df = df.append({'A': i}, ignore_index=True)
    >>> df
        A
    0  0
    1  1
    2  2
    3  3
    4  4

    More efficient:

    >>> pd.concat([pd.DataFrame([i], columns=['A']) for i in range(5)],
    ...           ignore_index=True)
        A
    0  0
    1  1
    2  2
    3  3
    4  4
    """

    return self._append(other, ignore_index, verify_integrity, sort)

def _append(
    self,
    other,
    ignore_index: bool = False,
    verify_integrity: bool = False,
    sort: bool = False,
) -> pd.DataFrame:
    combined_columns = None
    if isinstance(other, (pd.Series, dict)):
        if isinstance(other, dict):
            if not ignore_index:
                raise TypeError("Can only append a dict if ignore_index=True")
            other = pd.Series(other)
        if other.name is None and not ignore_index:
            raise TypeError(
                "Can only append a Series if ignore_index=True "
                "or if the Series has a name"
            )

        index = pd.Index([other.name], name=self.index.name)
        idx_diff = other.index.difference(self.columns)
        combined_columns = self.columns.append(idx_diff)
        row_df = other.to_frame().T
        # infer_objects is needed for
        #  test_append_empty_frame_to_series_with_dateutil_tz
        other = row_df.infer_objects().rename_axis(index.names, copy=False)
    elif isinstance(other, list):
        if not other:
            pass
        elif not isinstance(other[0], pd.DataFrame):
            other = pd.DataFrame(other)
            if self.index.name is not None and not ignore_index:
                other.index.name = self.index.name

    from pandas.core.reshape.concat import concat

    if isinstance(other, (list, tuple)):
        to_concat = [self, *other]
    else:
        to_concat = [self, other]

    result = concat(
        to_concat,
        ignore_index=ignore_index,
        verify_integrity=verify_integrity,
        sort=sort,
    )
    if (
        combined_columns is not None
        and not sort
        and not combined_columns.equals(result.columns)
    ):
        # TODO: reindexing here is a kludge bc union_indexes does not
        #  pass sort to index.union, xref #43375
        # combined_columns.equals check is necessary for preserving dtype
        #  in test_crosstab_normalize
        result = result.reindex(combined_columns, axis=1)
    return result.__finalize__(self, method="append")
pd.DataFrame.append = append
pd.DataFrame._append = _append

Collecting biopython
  Downloading biopython-1.85-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading biopython-1.85-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m46.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: biopython
Successfully installed biopython-1.85
Mounted at /content/gdrive
--2025-09-10 21:07:16--  https://github.com/angusli98/ColabBE/raw/refs/heads/main/PAM_scores.xlsx
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/angusli98/ColabBE/refs/heads/main/PAM_scores.xlsx [following]
--2025-09-10 21:07:17--  https://raw.githubusercontent.com/angusli98/ColabBE/refs/heads/main/PAM_scores.xlsx
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 

In [4]:
# @title (Optional) Search ClinVar for ClinVar IDs { display-mode: "form" }
# @markdown This section can be used to find ClinVar IDs for mutations of interest. It can be run as many times as needed before starting analysis by clicking the "Play" button to the left. Before running for the first time, also execute the "Mount Google Drive and install/import libraries" section above.
email = 'liangus@broadinstitute.org' # @param {type:"string"}
# @markdown - An email address is required for Entrez searches for ClinVar records. We do not store your email.
search_term = "KIF1A T99M" # @param {type:"string"}
max_record_number = 100 # @param {type:"integer"}
if search_term:
  Entrez.email = email
  handle = Entrez.esearch(db='clinvar', retmax = max(1, max_record_number), term=search_term)
  record = Entrez.read(handle)
  cvIds_search = record['IdList']
  post = Entrez.read(Entrez.epost('clinvar', id=','.join(cvIds_search)))
  webenv = post['WebEnv']
  query_key = post['QueryKey']
  records = Entrez.read(Entrez.esummary(db='clinvar', query_key=query_key, WebEnv=webenv), validate = False)
  df = pd.DataFrame.from_dict(records['DocumentSummarySet']['DocumentSummary'])
  df = df[['title','gene_sort','chr_sort','location_sort','obj_type','protein_change','germline_classification']]
  df.insert(0, 'ClinVar ID', cvIds_search)
  df['germline_classification'] = df['germline_classification'].apply(lambda x: x['description'])
  display(df)

Unnamed: 0,ClinVar ID,title,gene_sort,chr_sort,location_sort,obj_type,protein_change,germline_classification
0,432272,NM_001244008.2(KIF1A):c.308A>C (p.Lys103Thr),KIF1A,2,240788106,single nucleotide variant,K103T,Pathogenic
1,30169,NM_001244008.2(KIF1A):c.296C>T (p.Thr99Met),KIF1A,2,240788118,single nucleotide variant,T99M,Pathogenic


In [5]:
# @title Perform analysis and download results { display-mode: "form" }
# @markdown When all parameters have been specified, start analysis by clicking the "Play" button to the left. A CPU runtime is sufficient.
project_name = 'KIF1A T99M' # @param {type:"string"}
# @markdown - A project name is optional but can help distinguish between multiple analyses.
email = 'liangus@broadinstitute.org' # @param {type:"string"}
# @markdown - An email address is required for Entrez searches for ClinVar records. We do not store your email.
mode = 'Enter ClinVar IDs directly' # @param ["Upload a spreadsheet containing ClinVar IDs", "Enter a gene name", "Enter ClinVar IDs directly", "Use all ClinVar search results from above"]
# @markdown - Inputs for modes other than the one selected will be ignored. Excel and CSV files are supported for upload. If applicable, you will be prompted to upload after clicking the "Play" button to the left.
spreadsheet_has_headers = True # @param {type:"boolean"}
# @markdown - If you are uploading a spreadsheet with column headers, select `spreadsheet_has_headers` and enter the name of the column with ClinVar IDs here.
spreadsheet_column = 'VariationID' # @param {type:"string"}
# @markdown - If you are uploading a spreadsheet with column headers, specify the name of the column containing ClinVar IDs here. If your spreadsheet does not contain headers, the first column of the spreadsheet will automatically be used to retrieve the list of ClinVar IDs.
gene = 'KIF1A' # @param {type:"string"}
# @markdown - To search for corrective base editing strategies to all pathogenic/likely pathogenic mutations in a gene documented in ClinVar (may take a long time), use the "Enter a gene name" mode.
clinVar_ids = '30169' # @param {type:"string"}
# @markdown - ClinVar IDs found using the search form above can be entered here. ClinVar IDs can be separated by any mix of spaces, commas, and/or semicolons. This list will be used if the "Enter ClinVar IDs directly" mode is selected.
average_editing_efficiency = 0.5 # @param {type:"slider", min:0.01, max:0.99, step:0.01}
# @markdown - Specify the average editing efficiency of your base editor in `average_editing_efficiency`. This only affects the predicted editing efficiency at each target and will not alter the list of suggested editing strategies.
threshold = 0.6 # @param {type:"slider", min:0.01, max:0.99, step:0.01}
# @markdown - Use `threshold` to specify the cutoff threshold for predicted edit purity required for inclusion in the abbreviated summary of high-potential editor candidates. For coding mutations, purity is measured by exact match to reference translated protein sequence. For non-coding mutations, purity is measured by exact DNA sequence match to reference. This does not alter the full editor analysis that is also provided.
verbose_output = True # @param {type:"boolean"}
# @markdown - Selecting `verbose_output` will provide a running summary of edit strategies being tested. If deselected, only the current ClinVar ID and variant name being tested will be displayed.

if mode == "Enter ClinVar IDs directly":
  cvIds = re.split(r'[ ,;\t]+', clinVar_ids)
elif mode == "Enter a gene name":
  Entrez.email = email
  handle = Entrez.esearch(db='clinvar', retmax = 5000, term=f'((("{gene}"[Gene Name]) AND "single nucleotide variant"[Type of variation]) AND ("clinsig pathogenic"[Filter] OR "clinsig likely path"[Filter]))')
  record = Entrez.read(handle)
  cvIds = record['IdList']
elif mode == "Upload a spreadsheet containing ClinVar IDs":
  uploaded = {}
  while len(uploaded.keys()) != 1:
    print('Please upload a single spreadsheet containing ClinVar IDs to be analyzed.')
    uploaded = files.upload()
  fname = list(uploaded.keys())[0]
  ext = fname.split('.')[-1]
  if ext == 'xlsx' or ext == 'xls':
    if not spreadsheet_has_headers:
      df = pd.read_excel(io.BytesIO(uploaded[fname]), header = None)
      cvIds = df.iloc[:, 0].to_list()
    else:
      df = pd.read_excel(io.BytesIO(uploaded[fname]))
      cvIds = df[spreadsheet_column].to_list()
  elif ext == 'csv':
    if not spreadsheet_has_headers:
      df = pd.read_csv(io.BytesIO(uploaded[fname]), header = None)
      cvIds = df.iloc[:, 0].to_list()
    else:
      df = pd.read_csv(io.BytesIO(uploaded[fname]))
      cvIds = df[spreadsheet_column].to_list()
  else: raise Exception('Unsupported file type')
  cvIds = [str(int(cvId)) for cvId in cvIds if not np.isnan(cvId)]
elif mode == "Use all ClinVar search results from above":
  try: cvIds = cvIds_search
  except: print('No ClinVar search was run in the current session. Run a ClinVar search and try again.')


homedir = '/content/gdrive/MyDrive'
timestamp = datetime.now().strftime('%Y-%m-%d %HH %MM %SS')
os.mkdir(os.path.join(homedir,f'ColabBE {project_name} {timestamp}'))
run = 1
candidates = []
hits = {}
for cvId in cvIds:
    print(f'Run {run} of {len(cvIds)}')
    try:
      [sequence_distributions, stats_dict, vname] = analyze(cvId, email, verbose_output, average_editing_efficiency)
      if stats_dict:
          with pd.ExcelWriter(os.path.join(homedir,f'ColabBE {project_name} {timestamp}',f'{cvId} ({vname.replace(":"," ")}) predicted sequence distributions.xlsx')) as seq_dist:
              for entry in sequence_distributions:
                  (editor, spacer, pam) = entry
                  df_seq = pd.DataFrame.from_dict(sequence_distributions[entry], orient='index', columns=['Frequency'])
                  df_seq.to_excel(seq_dist, sheet_name=f'{editor} {spacer}|{pam}', index_label='Sequence')
          for entry in stats_dict:
              if entry['PAM'] in pam_dict:
                entry['HT-PAMDA score'] = pam_dict[entry['PAM']]['max_score']
                if pam_dict[entry['PAM']]['max_score'] < -3:
                  entry['Recommended Cas9 variant'] = f"Non-SpCas9 (Best SpCas9: {pam_dict[entry['PAM']]['best_PAM']})"
                else:
                  entry['Recommended Cas9 variant'] = pam_dict[entry['PAM']]['best_PAM']
              else:
                entry['HT-PAMDA score'] = 'N/A'
                entry['Recommended Cas9 variant'] = 'Non-SpCas9'
              if entry['Perfect correction'] >= threshold:
                  candidates.append(entry)
                  hits[entry['ClinVar ID']] = entry['Variant name']
          df_stats = pd.DataFrame.from_dict(stats_dict)
          df_stats.to_excel(os.path.join(homedir,f'ColabBE {project_name} {timestamp}',f'{cvId} ({vname.replace(":"," ")}) predicted base editing statistics.xlsx'))
    except RuntimeError as inst:
        print(inst)
    run += 1
df_summary = pd.DataFrame.from_dict(candidates)
df_summary.to_excel(os.path.join(homedir,f'ColabBE {project_name} {timestamp}',f'{project_name} summary of editor candidates above threshold.xlsx'))
df_hits = pd.DataFrame.from_dict(hits, orient='index', columns=['Variant name'])
df_hits.to_excel(os.path.join(homedir,f'ColabBE {project_name} {timestamp}',f'{project_name} variants with editor candidates above threshold.xlsx'), index_label='ClinVar ID')
shutil.make_archive(os.path.join(homedir, f'ColabBE {project_name} {timestamp}'), "zip", os.path.join(homedir, f'ColabBE {project_name} {timestamp}'))
files.download(os.path.join(homedir, f'ColabBE {project_name} {timestamp}.zip'))

Run 1 of 1
ClinVar ID: 30169
Variant name: NM_001244008.2(KIF1A):c.296C>T (p.Thr99Met)
WT:
GGATACAACGTGTGCATCTTCGCCTATGGGCAGACGGGTGCCGGCAAGTCCTACACCATGATGGGCAAGC
 G  Y  N  V  C  I  F  A  Y  G  Q  T  G  A  G  K  S  Y  T  M  M  G  K 
CCTATGTTGCACACGTAGAAGCGGATACCCGTCTGCCCACGGCCGTTCAGGATGTGGTACTACCCGTTCG
Variant:
GGATACAACGTGTGCATCTTCGCCTATGGGCAGATGGGTGCCGGCAAGTCCTACACCATGATGGGCAAGC
 G  Y  N  V  C  I  F  A  Y  G  Q  M  G  A  G  K  S  Y  T  M  M  G  K 
CCTATGTTGCACACGTAGAAGCGGATACCCGTCTACCCACGGCCGTTCAGGATGTGGTACTACCCGTTCG
The wild-type codon ACG, coding for T, is mutated to ATG, coding for M
ATG could be corrected to ACG
Sending GCTTGCCCATCATGGTGTAGGACTTGCCGGCACCCATCTGCCCATAGGCG (top strand) to BE-Hive with editor ABE
Sending CTTGCCCATCATGGTGTAGGACTTGCCGGCACCCATCTGCCCATAGGCGA (top strand) to BE-Hive with editor ABE
Sending TTGCCCATCATGGTGTAGGACTTGCCGGCACCCATCTGCCCATAGGCGAA (top strand) to BE-Hive with editor ABE
Sending TGCCCATCATGGTGTAGGACTTGCCGGCACCCATCTGCCCATAGGCGAAG (top strand) to BE-

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>