In [None]:
#!/usr/bin/env python


## NanoDiP all-in-one Jupyter Notebook

*J. Hench, C. Hultschig, J. Brugger, and S. Frank, Neuropathology, IfP Basel,
2021-2022*

This software is provided free of charge and warranty; by using it you agree to
do this on your own risk. The authors shall not be held liable for any damage
caused by this software. We have assembled this and tested it to the best of
our knowledge.

The purpose of NanoDiP (Nanopore Digital Pathology) is to compare low-coverage
Nanopore sequencing data from natively extracted DNA sequencing runs against a
flexibly adaptable collection of 450K/850K Illumina Infinium Methylation array
data. These data have to be preprocessed into binary beta value files; this
operation is performed in R (uses minfi to read raw array data) and outputs
bindary float files (one per dataset). These beta values files (e.g.,
204949770141_R03C01_betas_filtered.bin) are named according to the array ID
(Sentrix ID) followed by the suffix. A collection of betas_filtered.bin files
can be provided in a static manner and XLSX (Microsoft Excel) tables can be
used to select a subset thereof alongside a user-defined annotation. The
corresponding datasets will be loaded into memory and then serve as the
reference cohort to which the Nanopore data are compared by dimension reduction
(UMAP). This comparison is optimized for speed and low resource consumption so
that it can run on the computer that operates the sequencer. The sequencing run
is initiated through the MinKNOW API by this application. Basecalling and
methylation calling occur as background tasks outside this Jupyter Notebook.
User interaction occurs through a web interface based on CherryPy which has
been tested on Chromium web browser. It is advisable to run it locally, there
are no measures to secure the generated website.

In order to use this application properly please make sure to be somewhat
familiar with Jupyter Notebook. To run the software, press the button called
*restart the kernel, re-run the whole notebook (with dialog)* and confirm
execution. Then, in Chromium Browser, navigate to http://localhost:8080/ and
preferably bookmark this location for convenience. In case of errors, you may
just again click the same button *restart the kernel, re-run the whole notebook
(with dialog)*.

___ ### Technical Details
* Tested with Python 3.7.5; 3.8.8 fails to load minknow_api in jupyter
* notebook.  Verified to run on Ubuntu 18.04/Jetpack on ARMv8 and x86_64 CPUs;
* not tested on Windows and Mac OS. The latter two platforms are unsupported,
* we do not intend to support them.  **CAUTION**: Requires a *patched* version
* of minknow api, file
* `[VENV]/lib/python3.7/site-packages/minknow_api/tools/protocols.py`. Without
* the patch, the generated fast5 sequencing data will be unreadable with f5c or
* nanopolish (wrong compression algorithm, which is the default in the MinKNOW
* backend).

___ ### Headless / Command Line Mode CherryPy, the underlying web server of
NanoDiP allows for headless (command line-based) utilization of the software
besides or instead of browser-based use. Hence, the software may be operated as
a post-hoc analysis pipeline for previously acquired data. This is particularly
useful for benchmarking and validation purposes.

#### Examples: Generate copy number of for sample
**GBM_RTK2_20210311_Testrun_BC06**: `curl
'http://localhost:8080/cnvplot?sampleName=GBM_RTK2_20210311_Testrun_BC06'`

Calculate UMAP plot for sample **GBM_RTK2_20210311_Testrun_BC06** with
reference annotation **AllIDATv2_20210804.xlsx**: `curl
'http://localhost:8080/umapplot?sampleName=GBM_RTK2_20210311_Testrun_BC06&refAnno=AllIDATv2_20210804.xlsx'`

Assemble PDF report for sample **GBM_RTK2_20210311_Testrun_BC06** with
reference annotation **AllIDATv2_20210804.xlsx**: `curl
'http://localhost:8080/makePdf?sampleName=GBM_RTK2_20210311_Testrun_BC06&refAnno=AllIDATv2_20210804.xlsx'`

### Version Details

**30:** UMAP report score / PDF (NanoDiP)

**32:** UMAP report score / PDF (EpiDiP)


In [None]:
# Verify running Python version (should be 3.7.5) and adjust jupyter notebook.
import IPython
import os
from IPython.core.display import display, HTML

In [None]:
# set display witdth to 100%
display(HTML("<style>.container { width:100% !important; }</style>"))
os.system('python --version')


## Multithreading Options
Depending on the number of parallel threads/cores of the underlying hardware,
threading options for multithreaded modules need to be set as
environment-specific parameters. One way to do so is through the *os* module.


In [None]:
# Execution-wide multithreading options, set according to your hardware. Jetson
# AGX: suggest "2" needs to be set before importing other modules that query
# these parameters.
os.environ["NUMBA_NUM_THREADS"] = "2"
os.environ["OPENBLAS_NUM_THREADS"] = "2"
os.environ["MKL_NUM_THREADS"] = "2"


## Modules
This section imports the required modules that should have been installed via
pip. Other package managers have not been tested. To install packages, use the
setup script provided with this software or, alternatively, install them one
by one, ideally in a virtual python environment. Note that the MinKNOW API
requires manual patching after installation with pip.


In [None]:
from minknow_api.acquisition_pb2 import READY, STARTING, PROCESSING, FINISHING
from minknow_api.manager import Manager
from minknow_api.tools import protocols
from pdf2image import convert_from_path
from plotly.io import write_json, from_json
from tqdm import tqdm
from urllib import request
import argparse
import bisect
import cherrypy
import colorsys
import csv
import datetime
import grpc
import hashlib
import inspect
import jinja2
import json
import logging
import numpy as np
import os
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import psutil
import pysam
import re
import shutil
import socket
import subprocess
import sys
import threading
import warnings
import xhtml2pdf.pisa


Configuration
-------------------------------------------------------------------------------
Below are system-specific parameters that may or may not require adaptation.
Many variable names are self-explanatory. The key difference between Nanopore
setups are between devices provided by ONT (MinIT inclusively running the MinIT
distribution on a NVIDIA Jetson developer kit such as the AGX Xavier, GridION)
and the typical Ubuntu-based MinKNOW version on x86_64 computers. The raw data
are written into a `/data` directory on ONT-based devices while they are found
in `/var/lib/minknow/data` on x86_64 installations. Make sure to adapt your
`DATA` accordingly. There are furthermore permission issues and special folders
/ files in the MinKNOW data directory. These files / folders should be excluded
from analysis through `EXCLUDED_FROM_ANALYSIS` so that only real run folders
will be parsed. Finally, the `NANODIP_OUTPUT` is the place in which the
background methylation and alignment process will place its results by
replicating the directory hierarchy of the MinKNOW data location.  It will not
duplicate the data, and these data will be much smaller than raw run data. They
can be placed anywhere in the file tree, but also inside the MinKNOW data path
within a sub-folder. If the latter is the case, make sure to apply appropriate
read/write permissions. Final reports and figures generated by NanoDiP are
written into `NANODIP_REPORTS`.



General
-------------------------------------------------------------------------------


In [None]:
# NanoDiP version number.
__version__ = "31"

In [None]:
# Enables/disables debug mode.
DEBUG_MODE = True

In [None]:
# TODO implement this.
VERBOSITY = 0


Data directories for MinKNOW and NanoDiP output.
-------------------------------------------------------------------------------


In [None]:
# Where MinKNOW places its data.
DATA = "/data"

In [None]:
# Location to write intermediate analysis data, i.e. methylation and alignment
# files.
NANODIP_OUTPUT = os.path.join(DATA, "nanodip_output")

In [None]:
# Location to write reports and figures.
NANODIP_REPORTS = os.path.join(DATA, "nanodip_reports")


Reference data
-------------------------------------------------------------------------------


In [None]:
# Location of all reference data.
REFERENCE_DATA = "/applications/reference_data"

In [None]:
# Location of preprocessed beta values.
BETA_VALUES = os.path.join(REFERENCE_DATA, "betaEPIC450Kmix_bin")

In [None]:
# Location of annotation spreadsheets.
ANNOTATIONS = os.path.join(REFERENCE_DATA, "reference_annotations")

In [None]:
# Spreadsheet containing description of annotation codes (Basel internal).
ANNOTATIONS_ABBREVIATIONS_BASEL = os.path.join(
    ANNOTATIONS, "mc_anno_ifp_basel.csv"
)

In [None]:
# Spreadsheet containing description of annotation codes (TCGA). From:
# https://gdc.cancer.gov/resources-tcga-users/tcga-code-tables/tcga-study-abbreviations
ANNOTATIONS_ABBREVIATIONS_TCGA = os.path.join(
    ANNOTATIONS, "tcga_study_abbreviations.tsv"
)

In [None]:
# Illumina probe names of the 450K array.
ILLUMINA_CG_MAP = os.path.join(
    REFERENCE_DATA,
    "minimap_data/hg19_HumanMethylation450_15017482_v1-2_cgmap.tsv",
)

In [None]:
# Location of binary methylation data and metadata.
REFERENCE_METHYLATION_DATA = os.path.join(REFERENCE_DATA, "EPIC450K")

In [None]:
# Binary methylation file.
REFERENCE_METHYLATION = os.path.join(
    REFERENCE_METHYLATION_DATA, "methylation.bin"
)

In [None]:
# Metadata containing list of CpG names.
REFERENCE_CPG_SITES = os.path.join(REFERENCE_METHYLATION_DATA, "cpg_sites.csv")

In [None]:
# Metadata containing names of specimens.
REFERENCE_SPECIMENS = os.path.join(REFERENCE_METHYLATION_DATA, "specimens.csv")

In [None]:
# Metadata containing methylation matrix dimensions.
REFERENCE_METHYLATION_SHAPE = os.path.join(
    REFERENCE_METHYLATION_DATA, "shape.csv"
)

In [None]:
# Genome reference data containing chromosome lengths and centromere position.
CHROMOSOMES = os.path.join(REFERENCE_DATA, "hg19_cnv", "hg19_chromosomes.tsv")

In [None]:
# Human reference genome in fa format.
REFERENCE_GENOME_FA = os.path.join(REFERENCE_DATA, "minimap_data/hg19.fa")

In [None]:
# Human reference genome in minimap2 mmi format.
REFERENCE_GENOME_MMI = os.path.join(
    REFERENCE_DATA, "minimap_data/hg19_20201203.mmi"
)

In [None]:
# HG19 Gene data downloaded from:
# https://hgdownload.soe.ucsc.edu/goldenPath/hg19/bigZips/genes/hg19.refGene.gtf.gz
GENES_RAW = os.path.join(REFERENCE_DATA, "hg19_cnv", "hg19.refGene.gtf")

In [None]:
# Contains the data from GENES_RAW in simplified form.
GENES = os.path.join(REFERENCE_DATA, "hg19_cnv", "hg19_genes.csv")

In [None]:
# List of clinically important genes.
RELEVANT_GENES = os.path.join(REFERENCE_DATA, "hg19_cnv", "relevant_genes.csv")


File handling
-------------------------------------------------------------------------------


In [None]:
# String patterns in sample names that exclude data from downstream analysis,
# e.g., test runs
ANALYSIS_EXCLUSION_PATTERNS = ["_TestRun_"]

In [None]:
# List of files and folders in DATA to be excluded from analysis.
EXCLUDED_FROM_ANALYSIS = [
    ".Trash-1000",
    "core-dump-db",
    "intermediate",
    "lost+found",
    "minimap_data",
    "nanodip_output",
    "nanodip_reports",
    "nanodip_tmp",
    "non-ont",
    "pings",
    "playback_raw_runs",
    "queued_reads",
    "raw_for_playback",
    "reads",
    "user_scripts",
]

In [None]:
# List of file name sections that identify past runs.
RESULT_ENDING = {
    "cnv_png": "CNVplot.png",
    "ranking": "NanoDiP_ranking.pdf",
    "report": "NanoDiP_report.pdf",
    "umap_all": "UMAP_all.html",
    "umap_top": "UMAP_top.html",
}

In [None]:
# Used file endings.
ENDING = {
    **RESULT_ENDING,
    "aligned_reads": "alignedreads.txt",
    "cnv_bins_json": "CNV_binsplot.json",
    "cnv_html": "CNVplot.html",
    "cnv_json": "CNVplot.json",
    "cnv_pdf": "CNVplot.pdf",
    "cnv_png": "CNVplot.png",
    "cpg_cnt": "cpgcount.txt",
    "genes": "genes.csv",
    "methyl": "methyl_overlap.npy",
    "pie": "pie.png",
    "reads_csv": "reads.csv",
    "relevant_genes": "relevant_genes.csv",
    "sel_ref": "selected_reference.txt",
    "umap_all_html": "UMAP_all.html",
    "umap_all_json": "UMAP_all.json",
    "umap_all_png": "UMAP_all.png",
    "umap_csv": "UMAP.csv",
    "umap_top_json": "UMAP_top.json",
    "umap_top_png": "UMAP_top.png",
    "umap_xlsx": "UMAP.xlsx",
}


Experiments & basecalling
-------------------------------------------------------------------------------


In [None]:
# Beta values above this cutoff will be interpreted as methylated.
METHYLATION_CUTOFF = 0.35

In [None]:
# Barcode strings, currently kit SQK-RBK004.
BARCODE_NAMES = [
    "barcode01",
    "barcode02",
    "barcode03",
    "barcode04",
    "barcode05",
    "barcode06",
    "barcode07",
    "barcode08",
    "barcode09",
    "barcode10",
    "barcode11",
    "barcode12",
]

In [None]:
# Number of reads per file. 400 works well on the Jetson AGX. Higher numbers
# increase batch size and RAM usage, lower numbers use more I/O resources due
# to more frequent reloading of alignment reference.
READS_PER_FILE = "400"

In [None]:
# Number of basecalled bases until run termination occurs.
NEEDED_NUMBER_OF_BASES = 150_000_000

In [None]:
# Paths to binaries for methylation calling.
F5C = "/applications/f5c/f5c"
MINIMAP2 = "/applications/nanopolish/minimap2/minimap2"
SAMTOOLS = "/applications/samtools/samtools"


UMAP/CNV plots, Epidip
-------------------------------------------------------------------------------


In [None]:
# URL to load PDF with CNV plot for a given Sentrix ID (substituted for '%s')
CNV_LINK = (
    "http://s1665.rootserver.io/umapplot01/%s_CNV_IFPBasel_annotations.pdf"
)

In [None]:
# URL to load precalculated UMAP coordinates.
UMAP_LINK = "http://s1665.rootserver.io/umap_links/%s"

In [None]:
# List of UMAP coordinate files hosted on EpiDiP.
EPIDIP_UMAP_COORDINATE_FILES = [
    "UMAP_all_bVals_top_25000.xlsx",
    "UMAP_all_bVals_top_50000.xlsx",
    "UMAP_all_bVals_top_75000.xlsx",
    "gpumap_25000.xlsx",
    "gpumap_50000.xlsx",
    "gpumap_75000.xlsx",
]

In [None]:
# Path to precalculated CNV plotly grid.
CNV_GRID = "/applications/reference_data/hg19_cnv/grid.json"

In [None]:
# Number of reference cases to be shown in subplot including copy
# number profile links (not advisable >200, plotly will become really
# slow)
UMAP_PLOT_TOP_MATCHES = 100

In [None]:
# Controls the browser API used to draw marks. Default "webgl", alternative
# "svg" without proper webgl support (e.g. for Firefox, use "svg"; slower,
# but does not require GPU).
PLOTLY_RENDER_MODE = "webgl"


CherryPy
-------------------------------------------------------------------------------


In [None]:
# Host and port on which the NanoDiP UI will be served
CHERRYPY_HOST = "localhost"
THIS_HOST = "localhost"
CHERRYPY_PORT = 8080

In [None]:
# The web browser favicon file for this application.
BROWSER_FAVICON = "/applications/nanodip/nanodip/static/img/logo.png"


## Utils

Contains general utility functions.


In [None]:
def sanity_check():
    """Checks if reference data is available and generates a warning
    otherwise.
    """
    requested_files = [
        BETA_VALUES,
        ANNOTATIONS_ABBREVIATIONS_BASEL,
        ANNOTATIONS_ABBREVIATIONS_TCGA,
        ILLUMINA_CG_MAP,
        CHROMOSOMES,
        REFERENCE_GENOME_FA,
        REFERENCE_GENOME_MMI,
        GENES_RAW,
        GENES,
        RELEVANT_GENES,
        BROWSER_FAVICON,
        F5C,
        MINIMAP2,
        SAMTOOLS,
    ]
    for f in requested_files:
        if not os.path.exists(f):
            warnings.warn(
                f"File '{f}' not found.\nFunctionality may be restricted.",
                RuntimeWarning,
            )

In [None]:
def extract_referenced_cpgs(sample_methylation,
                            output_overlap,
                            output_overlap_cnt):
    """Extract Illumina CpG sites including methylation status from sample.
    Sex chromosomes are removed.

    Args:
        sample_methylation: methylation file of sample
        output_overlap: file path of CpG overlap
        output_overlap_cnt: file path of CpG overlap count
    """
    reference_cpgs = pd.read_csv(
        ILLUMINA_CG_MAP,
        delimiter="\t",
        names=["ilmnid", "chromosome", "strand", "start"],
    )
    sample_cpgs = pd.read_csv(
        sample_methylation,
        delimiter="\t",
    )
    cpgs = pd.merge(sample_cpgs, reference_cpgs, on=["chromosome", "start"])
    # Extract singelton CpG's
    cpgs = cpgs.loc[cpgs["num_cpgs_in_group"] == 1]
    # Remove duplicates and sex chromosomes
    cpgs = cpgs.loc[
       (~cpgs["chromosome"].isin(["chrX", "chrY"]))
       & (~cpgs["ilmnid"].duplicated())
    ]
    cpgs["is_methylated"] = 0
    cpgs.loc[cpgs["methylated_frequency"] > 0.5, "is_methylated"] = 1
    # Write overlap Data Frame
    cpgs[["ilmnid", "is_methylated"]].to_csv(
        output_overlap, header=False, index=False, sep="\t",
    )
    # Write number of CpG's
    with open(output_overlap_cnt, "w") as f:
        f.write(f"{len(cpgs)}")

In [None]:
def render_template(template_name, **context):
    """Renders jinja2 templates to HTML."""
    loader = jinja2.FileSystemLoader("templates")
    template = jinja2.Environment(
        loader=loader).get_template(template_name)
    return template.render(context)

In [None]:
def url_for(url_func, **args):
    """Transforms a cherrypy function together with argument list to
    url string.
    Example:
        url_for(CherryPyClass.url_func, var0=2, var1=7)
        == "url_func?var0=2&var1=7"
    Raises error if argument names are not correct.
    """
    # Find variable names of url_func.
    default = []
    non_default = []
    sig = inspect.signature(url_func)
    for param in sig.parameters.values():
        if param.default is param.empty:
            non_default.append(param.name)
        else:
            default.append(param.name)
    # Check if variable names are correct.
    for param in args:
        if param not in default + non_default:
            raise ValueError(
                f"'{param}' is not a valid Parameter of {url_func.__name__}."
            )
    url = url_func.__name__
    if args:
        # If args are supplied, enforce that all mandatory variables are
        # contained in args.
        for param in non_default:
            if param not in args and param != "self":
                raise ValueError(
                    f"Parameter '{param}' must be supplied."
                )
        url += "?" + "&".join(
            [f"{key}={value}" for key, value in args.items()]
        )
    return url

In [None]:
def convert_html_to_pdf(source_html, output_file):
    """Create PDF from HTML-string."""
    with open(output_file, "w+b") as f:
        xhtml2pdf.pisa.CreatePDF(source_html, dest=f)

In [None]:
def date_time_string_now():
    """Return current date and time as a string to create time stamps."""
    now = datetime.datetime.now()
    return now.strftime("%Y%m%d_%H%M%S")

In [None]:
def get_runs():
    """Return list of run folders from MinKNOW data directory sorted by
    modification time.
    """
    runs = []
    for f in os.listdir(DATA):
        if f not in EXCLUDED_FROM_ANALYSIS:
            file_path = os.path.join(DATA, f)
            mod_time = os.path.getmtime(file_path)
            if os.path.isdir(file_path):
                runs.append([f, mod_time])
    # Sort based on modification date
    runs.sort(key=lambda x: (x[1], x[0]), reverse=True)
    # Remove date after sorting
    return [x[0] for x in runs]

In [None]:
def get_all_results():
    """Return list of all analysis result files in report directory sorted
    by modification time.
    """
    files = []
    for f in os.listdir(NANODIP_REPORTS):
        for e in RESULT_ENDING.values():
            if f.endswith(e):
                mod_time = os.path.getmtime(
                    os.path.join(NANODIP_REPORTS, f)
                )
                files.append([f, mod_time])
    files.sort(key=lambda x: (x[1], x[0]), reverse=True)
    return [f[0] for f in files]

In [None]:
def files_by_ending(directory, sample_name, ending):
    """Returns a list containing all sample output files with a given
    ending.
    """
    sample_path = os.path.join(directory, sample_name)
    output_files = []
    for root, _, files in os.walk(sample_path):
        output_files.extend(
            [os.path.join(root, f) for f in files if f.endswith(ending)]
        )
    return output_files

In [None]:
def predominant_barcode(sample_name):
    """Returns the predominant barcode within all fast5 files."""
    fast5_files = files_by_ending(DATA, sample_name, ending=".fast5")
    pass_fast5_files = [f for f in fast5_files if "_pass_" in f]
    barcode_hits = []
    for barcode in BARCODE_NAMES:
        barcode_hits.append(
            len([f for f in pass_fast5_files if barcode in f])
        )
    max_barcode_cnt = max(barcode_hits)
    if max_barcode_cnt > 1:
        predominant = BARCODE_NAMES[
            barcode_hits.index(max_barcode_cnt)
        ]
    else:
        predominant = "undetermined"
    return predominant

In [None]:
def reference_annotations():
    """Return list of all reference annotation files (MS Excel XLSX format)."""
    annotations = []
    for r in os.listdir(ANNOTATIONS):
        if r.endswith(".xlsx"):
            annotations.append(r)
    return [a.replace(".xlsx", "") for a in annotations]

In [None]:
def composite_path(directory, *args):
    """Generate composite file-paths of the type
        'directory/arg1_arg2_arg3'
    """
    file_name = "_".join([str(x) for x in args])
    return os.path.join(
        directory,
        file_name,
    )

In [None]:
def discrete_colors(names):
    """Pseudorandom color scheme based on hashed values. Colors
    of methylation classes will be fixed to their name.
        Args:
            names: List of strings.
        Returns:
            Dictionary of color scheme for all string elements.
    """
    color = {}
    for var in set(names):
        hash_str = hashlib.md5(bytes(var, "utf-8")).digest()
        hash1 = int.from_bytes(hash_str[:8], byteorder="big")
        hash2 = int.from_bytes(hash_str[8:12], byteorder="big")
        hash3 = int.from_bytes(hash_str[12:], byteorder="big")
        hue = hash1 % 365
        saturation = hash2 % 91 + 10
        lightness = hash3 % 41 + 30
        # hsl has to be transformed to rgb, since otherwise not all colors
        # are displayed correctly, probably due to plotly bug.
        rgb_frac = colorsys.hls_to_rgb(hue/364, lightness/100, saturation/100)
        rgb = tuple(int(255 * x) for x in rgb_frac)
        color[var] = f"rgb{rgb}"
    return color


## Data

Data containers for sample, reference-data and reference-genome/gene
data.


In [None]:
# Define logger
logger = logging.getLogger(__name__)

In [None]:
def binary_reference_data_exists():
    """Check if the binary form of the reference data has already been
    created.
    """
    return (
        os.path.exists(REFERENCE_METHYLATION_DATA) and
        os.path.exists(REFERENCE_METHYLATION) and
        os.path.exists(REFERENCE_CPG_SITES) and
        os.path.exists(REFERENCE_SPECIMENS) and
        os.path.exists(REFERENCE_METHYLATION_SHAPE)
    )

In [None]:
def make_binary_reference_data(
    input_dir=BETA_VALUES,
    output_dir=REFERENCE_METHYLATION_DATA,
    cutoff=METHYLATION_CUTOFF,
):
    """Create binary methylation files from raw reference data.

    Args:
        input_dir: Directory of raw reference data (beta values)
            as float array-files.
        output_dir: Output directory where binary methylation file
            and metadata will be written.
        cutoff: Empirical cutoff value for methylated
            (round to 1) and unmethylated (round to 0) CpGs.
    """
    print("The binary reference data is generated. Takes 5-10 minutes.")
    if not os.path.isdir(output_dir):
        os.mkdir(output_dir)

    specimens = [f for f in os.listdir(input_dir) if f.endswith(".bin")]

    # Get shape parameters of output_data
    specimen_path0 = os.path.join(input_dir, specimens[0])
    with open(specimen_path0, "r") as f:
        beta_values_0 = np.fromfile(f, dtype=float)
    shape = (len(specimens), len(beta_values_0))

    methylation_data = np.empty(shape, dtype=bool)
    for i, specimen in enumerate(tqdm(specimens, desc="Reading reference")):
        specimen_path = os.path.join(input_dir, specimen)
        with open(specimen_path, "rb") as f:
            beta_values = np.fromfile(f, dtype=float)
            methylation_data[i] = np.digitize(
                beta_values,
                bins=[cutoff]
            ).astype(bool)

    # write methylation data as binary
    methylation_file = os.path.join(output_dir, "methylation.bin")
    methylation_data.tofile(methylation_file)

    # write shape parameters
    shape_file = os.path.join(output_dir, "shape.csv")
    with open(shape_file, "w") as f:
        f.write("%s\n %s" % shape)

    # write reference specimens
    specimens_file = os.path.join(output_dir, "specimens.csv")
    specimen_names = [s[:-len("_betas_filtered.bin")] for s in specimens]
    with open(specimens_file, "w") as f:
        f.write("\n".join(specimen_names))

    # write reference cpg sites
    index_file = os.path.join(output_dir, "cpg_sites.csv")
    with open(os.path.join(input_dir, "index.csv")) as f:
        index = f.read()
    with open(index_file, "w") as f:
        f.write(index)

In [None]:
def make_binary_reference_data_if_needed():
    if not binary_reference_data_exists():
        make_binary_reference_data()

In [None]:
class Reference:
    """Container of reference data and metadata."""
    def __init__(self, name):
        make_binary_reference_data_if_needed()
        self.name = name
        with open(REFERENCE_CPG_SITES, "r") as f:
            # Save cpgs as dictionary to allow fast index lookup.
            self.cpg_site_to_index = {
                cpg:i for i, cpg in enumerate(f.read().splitlines())
            }
        self.cpg_sites = self.cpg_site_to_index.keys()
        self.annotation = self.get_annotation()
        with open(REFERENCE_SPECIMENS) as f:
            self.all_specimens = f.read().splitlines()
        # Only consider specimens with annotation entry and binary file.
        annotated_specimens = set(self.annotation["id"]) & set(
            self.all_specimens
        )
        # Save as dictionary to allow fast index lookup.
        specimen_to_index = {s:i for i, s in enumerate(self.all_specimens)}
        self.specimens_index = [
            specimen_to_index[a] for a in annotated_specimens
        ]
        self.specimens_index.sort()
        # Save as dictionary to allow fast methylation class lookup.
        specimen_to_mc = {
            i:mc for i, mc in zip(
                self.annotation.id, self.annotation.methylation_class
            )
        }
        # Annotated specimens sorted by increasing index
        self.specimens = [
            self.all_specimens[i] for i in self.specimens_index
        ]
        self.methylation_class = [
            specimen_to_mc[s] for s in self.specimens
        ]
        self.description = Reference.get_description(
            self.methylation_class
        )

    def get_annotation(self):
        """Reads annotation as csv file from disk, and returns it as
        pd.DataFrame. If csv is missing or file not up to date, annotation
        is read from original excel file (slow) and csv file is written to
        disk.
        """
        path_csv = os.path.join(ANNOTATIONS, self.name + ".csv")
        path_xlsx = os.path.join(ANNOTATIONS, self.name + ".xlsx")
        csv_exists_and_up_to_date = (
            os.path.exists(path_csv) and
            os.path.getmtime(path_csv) > os.path.getmtime(path_xlsx)
        )
        if csv_exists_and_up_to_date:
            return pd.read_csv(path_csv)
        annotation = pd.read_excel(
            path_xlsx,
            header=None,
            names=["id", "methylation_class", "custom_text"],
            engine="openpyxl",
        )
        annotation.to_csv(path_csv, index=False)
        return annotation

    def get_description(methylation_classes):
        """Returns a description of the methylation class using
        a heuristic approach.
        """
        abbr_df = pd.read_csv(ANNOTATIONS_ABBREVIATIONS_BASEL)
        abbr = {
            mc:desc for mc, desc in
            zip(abbr_df.MethylClassStr, abbr_df.MethylClassShortDescr)
        }
        non_trivial_abbr = abbr.copy()
        non_trivial_abbr.pop("-")
        tcga_df = pd.read_csv(ANNOTATIONS_ABBREVIATIONS_TCGA, delimiter="\t")
        tcga = {r[0]:r[1] for _, r in tcga_df.iterrows()}
        def description(mc):
            """Returns description of methylation class {mc}."""
            mc = mc.upper()
            # Exact match
            if mc in abbr:
                return abbr[mc]
            # Else choose longest substring from Basel-Annotations/TCGA
            basel_substring = [a for a in non_trivial_abbr if a in mc]
            basel_substring.sort(key=len)
            tcga_substring = [a for a in tcga if a in mc]
            tcga_substring.sort(key=len)
            # Prefer Basel Annotation
            if (
                basel_substring and (
                    not tcga_substring or
                    len(basel_substring[-1]) >= len(tcga_substring[-1])
                )
            ):
                return abbr[basel_substring[-1]]
            # Else use TCGA Annotation
            if tcga_substring:
                return tcga[tcga_substring[-1]]
            # No proper annotation for "PITUI"
            if mc == "PITUI":
                return "Pituicytoma"
            return ""
        mc_description = [
            description(mc).capitalize() for mc in methylation_classes
        ]
        return mc_description

    def __str__(self):
        """Prints overview of object for debugging purposes."""
        lines = [
            f"Reference object:",
            f"name: '{self.name}'",
            f"annotation:\n{self.annotation}",
            f"cpg_site_to_index:\n{pd.DataFrame(self.cpg_site_to_index.items())}",
            f"cpg_sites:\n{pd.DataFrame(self.cpg_sites)}",
            f"all_specimens:\n{pd.DataFrame(self.all_specimens)}",
            f"specimens :\n{pd.DataFrame(self.specimens)}",
            f"specimens_index\n{pd.DataFrame(self.specimens_index)}",
            f"methylation_class: {pd.DataFrame(self.methylation_class)}",
            f"description: {pd.DataFrame(self.description)}",
        ]
        return "\n".join(lines)

In [None]:
class Genome:
    """Data container for reference genome data."""
    def __init__(self):
        self.chrom = pd.read_csv(CHROMOSOMES, delimiter="\t", index_col=False)
        self.chrom["offset"] = [0] + np.cumsum(self.chrom["len"]).tolist()[:-1]
        self.chrom["center"] = self.chrom["offset"] + self.chrom["len"]//2
        self.chrom["centromere_offset"] = (
            self.chrom["offset"]
            + (self.chrom["centromere_start"] + self.chrom["centromere_end"])
            // 2
        )
        self.length = (
            self.chrom["offset"].iloc[-1] + self.chrom["len"].iloc[-1]
        )
        if not os.path.exists(GENES):
            self.write_genes_csv()
        self.genes = pd.read_csv(GENES, delimiter="\t")

    def __iter__(self):
        """Enables looping over chromosomes."""
        return self.chrom.itertuples()

    def __len__(self):
        return self.length

    def write_genes_csv(self):
        """Write csv gene list with one selected transcript per gene."""
        genes = pd.read_csv(
            GENES_RAW,
            delimiter="\t",
            names=["seqname", "source", "feature", "start", "end",
                   "score", "strand", "frame", "attribute"],
            usecols=["seqname", "feature", "start", "end", "attribute"]
        )
        genes = genes.loc[
            (genes["feature"] == "transcript")
            & (genes["seqname"].isin(self.chrom.name))
        ]
        genes["name"] = genes.attribute.apply(
            lambda x: re.search('gene_name(.*)"(.*)"', x).group(2)
        )
        genes["transcript"] = genes.attribute.apply(
            lambda x: re.search(
                'transcript_id(.*)"(.*)"(.*)gene_name(.*)', x
                ).group(2)
        )
        genes = genes.drop_duplicates(subset=["name", "seqname"], keep="first")
        genes = genes.sort_values("name")
        genes["loc"] = genes.apply(
            lambda z: (
                  z["seqname"]
                + ":"
                + "{:,}".format(z["start"])
                + "-"
                + "{:,}".format(z["end"])
            ),
            axis=1,
        )
        # Make data compatible with pythonic notation
        genes["end"] += 1
        offset = {i.name:i.offset for i in self}
        genes["start"] = genes.apply(
            lambda z: offset[z["seqname"]] + z["start"],
            axis=1,
        )
        genes["end"] = genes.apply(
            lambda z: offset[z["seqname"]] + z["end"],
            axis=1,
        )
        genes["midpoint"] = (genes["start"] + genes["end"]) // 2
        with open(RELEVANT_GENES, "r") as f:
            relevant_genes = f.read().splitlines()
        genes["relevant"] = genes.name.apply(lambda x: x in relevant_genes)
        genes["len"] = genes["end"] - genes["start"]
        genes[["name", "seqname", "start", "end",
               "len", "midpoint", "relevant", "transcript",
               "loc",
        ]].to_csv(GENES, index=False, sep="\t")

    def __str__(self):
        """Prints overview of object for debugging purposes."""
        lines = [
            "Genome object:",
            f"length: {self.length}",
            f"chrom:\n{self.chrom}",
            f"genes:\n{self.genes}",
        ]
        return "\n".join(lines)

In [None]:
def cpg_methyl_from_reads(sample_name):
    """Returns all Illumina methylation CpG-sites with methylation
    status extracted so far.

    Args:
        sample_name: sample name to be analysed

    Returns:
        Pandas Data Frame containing the reads Illumina cpg_sites and
        methylation status.
    """

    cpg_files = files_by_ending(NANODIP_OUTPUT, sample_name,
                                ending="methoverlap.tsv")
    methylation_info = pd.DataFrame(columns=["cpg_site", "methylation"])
    for f in cpg_files:
        # Some fast5 files do not contain any CpGs.
        try:
            cpgs = pd.read_csv(f, delimiter="\t", header=None,
                                names=["cpg_site", "methylation"])
            methylation_info = methylation_info.append(cpgs)
        except FileNotFoundError:
            logger.exception("Empty file encountered, skipping")
    return methylation_info

In [None]:
class Sample:
    """Container of sample data."""
    def __init__(self, name):
        self.name = name
        self.methyl_df = cpg_methyl_from_reads(name)
        self.cpg_overlap = set()
        self.cpg_overlap_index = None
        self.reads = None

    def set_reads(self):
        """Calculate all read start and end positions and save data
        as list to self.reads.
        """
        genome = Genome()
        bam_files = files_by_ending(NANODIP_OUTPUT, self.name, ending=".bam")
        read_positions = []
        for f in bam_files:
            samfile = pysam.AlignmentFile(f, "rb")
            for chrom in genome:
                for read in samfile.fetch(chrom.name):
                    read_positions.append([
                        read.reference_start + chrom.offset,
                        # reference_end equals first position after alignment
                        # consistent with python notations.
                        read.reference_end + chrom.offset,
                    ])
                    assert (read.reference_length != 0), "Empty read"
        self.reads = read_positions

    def set_cpg_overlap(self, reference):
        """Sets CpG overlap and cpg-site-index between sample
        and reference.

        This is necessary since some probes have been skipped from the
        reference set, e.g. sex chromosomes.
        """
        self.cpg_overlap = set(self.methyl_df["cpg_site"]).intersection(
            reference.cpg_sites)
        self.cpg_overlap_index = [
            reference.cpg_site_to_index[f] for f in self.cpg_overlap
        ]
        self.cpg_overlap_index.sort()

    def __str__(self):
        """Prints overview of object for debugging purposes."""
        lines = [
            f"Sample object:",
            f"name: '{self.name}':",
            f"methyl_df: {self.methyl_df}",
            f"cpg_overlap: {pd.DataFrame(self.cpg_overlap)}",
            f"cpg_overlap_index: {pd.DataFrame(self.cpg_overlap_index)}",
            f"reads: {pd.DataFrame(self.reads)}",
        ]
        return "\n".join(lines)

In [None]:
def reference_methylation_from_index(reference_index, cpg_index):
    """Extract and return (reference-specimen x CpG-site) methylation
    submatrix from reference data.

    Args:
        reference_index: Indices of references to extract from reference
            data.
        cpg_index: Indices of Illumina CpG's to extract data.

    Returns:
        Numpy array matrix containing submatrix of reference methylation
        with rows=reference_index and columns=cpg_index.
    """
    make_binary_reference_data_if_needed()
    shape = [len(reference_index), len(cpg_index)]
    delta_offset = np.diff(reference_index, prepend=-1) - 1
    reference_submatrix = np.empty(shape, dtype=bool)
    with open(REFERENCE_METHYLATION_SHAPE, "r") as f:
        number_of_cpgs = [int(s) for s in f.read().splitlines()][1]
    with open(REFERENCE_METHYLATION, "rb") as f:
        for i, d in enumerate(delta_offset):
            reference_submatrix[i] = np.fromfile(
                f, dtype=bool, offset=d*number_of_cpgs, count=number_of_cpgs
            )[cpg_index]
    return reference_submatrix

In [None]:
def get_reference_methylation(sample, reference):
    """Extract and return (reference-specimen x CpG-site) methylation
    matrix from overlap of sample CpG's with annotated reference data.
    """
    if not sample.cpg_overlap:
        raise ValueError("CpG overlap is empty")
    reference_index = reference.specimens_index
    cpg_index = sample.cpg_overlap_index
    return reference_methylation_from_index(reference_index, cpg_index)

In [None]:
def get_sample_methylation(sample, reference):
    """Calculate and return sample methylation from reads.

    Args:
        sample: Sample to be analysed.
        reference: Reference used to determine CpG overlap with sample.
    Returns:
        Numpy array containing sample methylation on CpG overlap
            sites.
    """
    if not sample.cpg_overlap:
        raise ValueError("CpG overlap is empty")
    sample_methylation = np.full(
        len(reference.cpg_sites), 0, dtype=bool
    )
    sample_mean_methylation = sample.methyl_df.groupby(
        "cpg_site",
        as_index=False).mean()

    for _, row in sample_mean_methylation.iterrows():
        cpg = row["cpg_site"]
        if cpg in sample.cpg_overlap:
            i = reference.cpg_site_to_index[cpg]
            sample_methylation[i] = row["methylation"] > METHYLATION_CUTOFF
    return sample_methylation[sample.cpg_overlap_index]


## Plots

Functions for creating Copy Number Variation plot.
Functions for creating methylation UMAP plot.


In [None]:
# Define logger
logger = logging.getLogger(__name__)

In [None]:
def get_bin_edges(n_bins, genome):
    """Returns sequence of {n_bins} equal sized bins on chromosomes used
    by numpy histogram. Every bin is limited to one chromosome.
    """
    edges = np.linspace(0, len(genome), num=n_bins + 1).astype(int)
    # limit bins to only one chromosome
    for chrom_edge in genome.chrom.offset:
        i_nearest = np.abs(edges - chrom_edge).argmin()
        edges[i_nearest] = chrom_edge
    return edges

In [None]:
def get_cnv(read_positions, genome):
    """Return CNV.
    Args:
        read_positions: List of reads in the form [start, end].
        genome: Reference genome.
    Returns:
        bin_midpoints: x-values
        copy_numbers: y-values
    """
    expected_reads_per_bin = 30
    n_bins = len(read_positions)//expected_reads_per_bin
    read_start_positions = [i[0] for i in read_positions]
    copy_numbers, bin_edges = np.histogram(
        read_start_positions,
        bins=get_bin_edges(n_bins, genome),
        range=[0, len(genome)],
    )
    bin_midpoints = (bin_edges[1:] + bin_edges[:-1])/2
    return bin_midpoints, copy_numbers

In [None]:
def cnv_grid(genome):
    """Returns chromosome grid layout for CNV Plot as plotly object and
    saves it on disk. If available grid is directly read from disk.
    """
    # Check if grid exists and return if available.
    if os.path.exists(CNV_GRID):
        with open(CNV_GRID, "r") as f:
            grid = from_json(f.read())
        return grid

    grid = go.Figure()
    grid.update_layout(
        coloraxis_showscale=False,
        xaxis = dict(
            linecolor="black",
            linewidth=1,
            mirror=True,
            range=[0, len(genome)],
            showgrid=False,
            ticklen=10,
            tickmode="array",
            ticks="outside",
            tickson="boundaries",
            ticktext=genome.chrom.name,
            tickvals=genome.chrom.center,
            zeroline=False,
        ),
        yaxis = dict(
            linecolor="black",
            linewidth=1,
            mirror=True,
            showline=True,
        ),
        template="simple_white",
    )
    # Vertical line: centromere.
    for i in genome.chrom.centromere_offset:
        grid.add_vline(x=i, line_color="black", line_dash="dot", line_width=1)
    # Vertical line: chromosomes.
    for i in genome.chrom.offset.tolist() + [len(genome)]:
        grid.add_vline(x=i, line_color="black", line_width=1)
    # Save to disk
    grid.write_json(CNV_GRID)
    return grid

In [None]:
def cnv_plot_from_data(data_x, data_y, expt_y, sample_name, read_num, genome):
    """Create CNV plot from CNV data.

    Args:
        data_x: x-Values to plot.
        data_y: y-Values to plot.
        expt_y: expected y-Value.
        sample_name: Name of sample.
        read_num: Number of read reads.
        genome: Reference Genome.
    """
    grid = cnv_grid(genome)
    # Expected value: draw horizontal line.
    grid.add_hline(y=expt_y, line_color="black", line_width=1)
    plot = px.scatter(
        x=data_x,
        y=data_y,
        labels={
            "x":f"Number of mapped reads: {read_num}",
            "y":f"Copy numbers per {round(len(genome)/(len(data_x)*1e6), 2)} MB"
        },
        title=f"Sample ID: {sample_name}",
        color=data_y,
        range_color=[expt_y*0, expt_y*2],
        color_continuous_scale="Portland",
        render_mode=PLOTLY_RENDER_MODE,
    )
    plot.update_traces(hovertemplate="Copy Numbers = %{y} <br>")
    plot.update_layout(grid.layout, yaxis_range = [-0.5, 2*expt_y])
    return plot

In [None]:
def number_of_reads(sorted_read_start_pos, interval):
    """Return the number of starting sequences within interval. Reads must
    be sorted in ascending order.
    """
    left, right = interval
    i_left = bisect.bisect_left(sorted_read_start_pos, left)
    i_right = bisect.bisect_left(sorted_read_start_pos, right)
    return len(sorted_read_start_pos[i_left:i_right])

In [None]:
def cnv_plot(sample, bin_midpoints, cnv, genome):
    """Create a genome-wide copy number plot and save data on dist."""
    logger.info("CNVP start")
    logger.info(sample)
    logger.info("Bin midpoints:\n%s", bin_midpoints)
    logger.info("CNV:\n%s", cnv)

    avg_read_per_bin = len(sample.reads) // len(bin_midpoints)

    plot = cnv_plot_from_data(
        data_x=bin_midpoints,
        data_y=cnv,
        expt_y = avg_read_per_bin,
        sample_name=sample.name,
        read_num=len(sample.reads),
        genome=genome,
    )
    logger.info("CNVP done")
    return plot

In [None]:
class CNVData:
    """CNV data container and methods for invoking CNV plot algorithm."""
    genome = Genome()
    def __init__(self, sample_name):
        self.sample = Sample(sample_name)
        self.plot = None
        self.plot_json = None
        self.genes = None
        self.relevant_genes = None
        self.bin_midpoints = None
        self.cnv = None

    def path(self, ending):
        """Returns generic path with corresponding ending."""
        return composite_path(
            NANODIP_REPORTS, self.sample.name, ENDING[ending]
        )

    def files_on_disk(self):
        """Checks if files are on disk."""
        return (
            os.path.exists(self.path("cnv_json")) and
            os.path.exists(self.path("genes"))
        )

    def read_from_disk(self):
        """Reads files from disk."""
        with open(self.path("cnv_json"), "r") as f:
            self.plot_json = f.read()
        self.plot = from_json(self.plot_json)
        self.genes = pd.read_csv(self.path("genes"))

    def make_cnv_plot(self):
        """Generates CNV plot and saves to disk."""
        self.sample.set_reads() # time consumption 2.5s
        self.bin_midpoints, self.cnv = get_cnv(
            self.sample.reads,
            CNVData.genome,
        )
        if len(self.bin_midpoints) == 0:
            raise ValueError("no points to plot")
        self.plot = cnv_plot(
            sample=self.sample,
            bin_midpoints=self.bin_midpoints,
            cnv=self.cnv,
            genome=CNVData.genome,
        )
        self.plot_json = self.plot.to_json()
        self.genes = self.gene_cnv()
        self.relevant_genes = self.genes.loc[self.genes.relevant]
        self.save_to_disk()

    def save_to_disk(self):
        """Saves attributes to disk."""
        self.plot.write_html(
            self.path("cnv_html"),
            config=dict({"scrollZoom": True}),
        )
        write_json(self.plot, self.path("cnv_json"))
        # time consuming operation (1.96s)
        self.plot.write_image(
            self.path("cnv_png"), width=1280, height=720, scale=3,
        )
        with open(self.path("aligned_reads"), "w") as f:
            f.write(f"{len(self.sample.reads)}")
        with open(self.path("reads_csv"), "w") as f:
            write = csv.writer(f)
            write.writerows(self.sample.reads)
        self.genes.to_csv(self.path("genes"), index=False)
        self.relevant_genes.to_csv(self.path("relevant_genes"), index=False)

    def gene_cnv(self):
        """Returns pandas DataFrame containing copy number variation
        for all genes in reference genome.
        """
        genes = CNVData.genome.genes
        genes["interval"] = list(zip(genes.start, genes.end))
        read_start_pos = [i[0] for i in self.sample.reads]
        read_start_pos.sort()
        genes["cn_obs"] = genes.interval.apply(
            lambda z: number_of_reads(read_start_pos, z)
        )
        bin_size = len(CNVData.genome)/(len(self.bin_midpoints))
        genes["cn_per_bin"] = genes.apply(
            lambda z: z["cn_obs"]/z["len"] * bin_size, # TODO auto draw extreme values
            axis=1,
        )
        genes["cn_exp"] = genes.apply(
            lambda z: len(self.sample.reads)*z["len"]/len(CNVData.genome),
            axis=1,
        )
        genes["cn_obs_exp_ratio"] = genes.apply(
            lambda z: z["cn_obs"]/z["cn_exp"],
            axis=1,
        )
        genes = genes.sort_values(by="cn_obs", ascending=False)
        return genes

    def get_gene_positions(self, genes):
        """Returns sub-DataFrame containing the genes of the list {genes}.
        """
        gene_pos = self.genes.loc[self.genes.name.isin(genes)]
        return gene_pos

    def plot_cnv_and_genes(self, gene_names):
        """Returns json plot of the CNV plot including the CN of all
        genes in the list {gene_names}.
        """
        genes = self.get_gene_positions(gene_names)
        plot = go.Figure(self.plot)
        plot.add_trace(
            go.Scatter(
                customdata=genes[[
                    "name",          # 0
                    "loc",           # 1
                    "transcript",    # 2
                    "len",           # 3
                ]],
                hovertemplate=(
                    "Copy numbers = %{y} <br>"
                    "<b> %{customdata[0]} </b> <br>"
                    "%{customdata[1]} "
                    "(hg19 %{customdata[2]}) <br>"
                    "%{customdata[3]} bases <br>"
                ),
                name="",
                marker_color="rgba(0,0,0,1)",
                mode="markers+text",
                marker_symbol="diamond",
                textfont_color="rgba(0,0,0,1)",
                showlegend=False,
                text=genes.name,
                textposition="top center",
                x=genes.midpoint,
                y=genes.cn_obs,
            ))
        return plot.to_json()

In [None]:
def umap_plot_from_data(sample, reference, umap_df, close_up):
    """Create and return umap plot from UMAP data.

    Args:
        sample: Sample data.
        reference: Reference data.
        umap_df: pandas data frame containing UMAP matrix and
            attributes. First row,w corresponds to sample.
        close_up: Bool to indicate if only top matches should be plotted.
    Returns:
        UMAP plot as plotly object.
    """
    umap_sample = umap_df.iloc[0]
    umap_title = (
        f"UMAP for {sample.name} <br><sup>Reference: {reference.name} "
        f"({len(reference.specimens)} cases), "
        f"{len(sample.cpg_overlap)} CpGs </sup>"
    )
    if close_up:
        umap_title = "Close-up " + umap_title
    methyl_classes = umap_df.methylation_class[1:].to_list()
    methyl_classes.sort()
    umap_plot = px.scatter(
        umap_df,
        x="x",
        y="y",
        labels={"x":"UMAP 0", "y":"UMAP 1", "methylation_class":"WHO class"},
        title=umap_title,
        color="methylation_class",
        color_discrete_map={
            sample.name: "#ff0000",
            **discrete_colors(methyl_classes),
        },
        hover_name="id",
        category_orders={"methylation_class": [sample.name] + methyl_classes},
        hover_data=["description"],
        render_mode=PLOTLY_RENDER_MODE,
        template="simple_white",
    )
    umap_plot.add_annotation(
        x=umap_sample["x"],
        y=umap_sample["y"],
        text=sample.name,
        showarrow=True,
        arrowhead=1,
    )
    umap_plot.update_yaxes(
        scaleanchor = "x",
        scaleratio = 1,
        mirror=True,
    )
    umap_plot.update_xaxes(
        mirror=True,
    )

    # If close-up add hyperlinks for all references and draw circle
    if close_up:
        umap_plot.update_traces(marker=dict(size=5))
        # Add hyperlinks
        for _, row in umap_df.iloc[1:].iterrows():
            url = CNV_LINK % row["id"]
            umap_plot.add_annotation(
                x=row["x"],
                y=row["y"],
                text=f"<a href='{url}' target='_blank'>&nbsp;</a>",
                showarrow=False,
                arrowhead=1,
            )
        # Draw circle
        radius = umap_df["distance"].iloc[-1]
        umap_plot.add_shape(
            type="circle",
            x0=umap_sample["x"] - radius,
            y0=umap_sample["y"] - radius,
            x1=umap_sample["x"] + radius,
            y1=umap_sample["y"] + radius,
            fillcolor="rgba(0,0,0,0)",
            line_color="black",
            line_width=1.0,
        )
    return umap_plot

In [None]:
def dimension_reduction(sample, reference):
    """Performs UMAP 2d-dimension reduction.

    Args:
        sample: Sample to analyse.
        reference: Reference data which are compared with sample.
    Returns:
        methyl_overlap: Methlation matrix with rows corresponding to sample
            (first row) and reference specimens (remaining rows) and columns
            corresponding to CpGs present both in the reference specimens
            and the sample data set.
        umap_df: UMAP DataFrame corresponding to dimension reduction of
            methyl_overlap.
    """
    import umap #Moved here due to long loading time (13.5s)

    logger.info("Start UMAP for %s / %s.", sample.name, reference.name)
    logger.info(reference)

    # Calculate overlap of sample CpG's with reference CpG's (some probes
    # have been skipped from the reference set, e.g. sex chromosomes).
    sample.set_cpg_overlap(reference)
    logger.info(sample)

    if not sample.cpg_overlap:
        logger.info("UMAP done. No Matrix created, no overlapping data.")
        raise ValueError("Sample has no overlapping CpG's with reference.")

    # Extract reference and sample methylation according to CpG overlap.
    reference_methylation = get_reference_methylation(sample, reference)
    logger.info("Reference methylation extracted:\n%s", reference_methylation)
    sample_methylation = get_sample_methylation(sample, reference)
    logger.info("Sample methylation extracted:\n%s", sample_methylation)

    # Calculate UMAP Nx2 Matrix. Time intensive (~1min).
    methyl_overlap = np.vstack([sample_methylation, reference_methylation])
    logger.info("UMAP algorithm initiated.")
    umap_2d = umap.UMAP(verbose=True).fit_transform(methyl_overlap)
    logger.info("UMAP algorithm done.")

    # Free memory
    del reference_methylation
    del sample_methylation

    umap_sample = umap_2d[0]
    umap_df = pd.DataFrame({
        "distance": [np.linalg.norm(z - umap_sample) for z in umap_2d],
        "methylation_class": [sample.name] + reference.methylation_class,
        "description":  ["Analysis sample"] + reference.description,
        "id": [sample.name] + reference.specimens,
        "x": umap_2d[:,0],
        "y": umap_2d[:,1],
    })

    logger.info("UMAP done. Matrix created.")
    return (methyl_overlap, umap_df)

In [None]:
def pie_chart(umap_data):
    """Returns plotly pie chart of the methylation classes of the nearest UMAP
    neighbors (according to euclidean 2d-distance) of the sample.
    """
    umap_neighbors = umap_data.umap_df.sort_values(by="distance")[
        1 : UMAP_PLOT_TOP_MATCHES + 1
    ]

    num_per_class = (
        umap_neighbors.groupby(["methylation_class"])
        .size()
        .reset_index(name="counts")
    )
    sample = umap_data.sample
    reference = umap_data.reference
    plot = px.pie(
        num_per_class,
        values="counts",
        names="methylation_class",
        color="methylation_class",
        color_discrete_map=discrete_colors(num_per_class.methylation_class),
        title=(
            f"Nearest UMAP neighbors for {umap_data.sample.name} <br><sup>"
            f"Reference: {reference.name} "
            f"({len(reference.specimens)}"
            f"cases), {len(sample.cpg_overlap)} CpGs</sup>"
        ),
        template="simple_white",
    )
    return plot

In [None]:
class UMAPData:
    """UMAP data container and methods for invoking UMAP plot algorithm."""
    def __init__(self, sample_name, reference_name):
        self.sample = Sample(sample_name)
        self.reference = Reference(reference_name)
        self.methyl_overlap = None
        self.umap_df = None
        self.cu_umap_df = None
        self.plot = None
        self.plot_json = None
        self.cu_plot = None
        self.cu_plot_json = None
        self.pie_chart = None

    def path(self, ending):
        """Returns generic path with corresponding ending."""
        return composite_path(
            NANODIP_REPORTS,
            self.sample.name, self.reference.name, ENDING[ending],
        )

    def make_umap_plot(self):
        """Invoke UMAP plot algorithm and save files to disk."""
        self.methyl_overlap, self.umap_df = dimension_reduction(
            self.sample, self.reference
        )
        self.draw_scatter_plots()
        self.draw_pie_chart()
        self.save_to_disk()

    def draw_pie_chart(self):
        """Draw pie chart of nearest UMAP neighbors."""
        self.pie_chart = pie_chart(self)

    def draw_scatter_plots(self):
        """Draws UMAP scatter plot with close-up plot from data."""
        self.plot = umap_plot_from_data(
            self.sample,
            self.reference,
            self.umap_df,
            close_up=False,
        )
        logger.info("UMAP plot generated.")
        self.cu_umap_df = self.umap_df.sort_values(
            by="distance"
        )[:UMAP_PLOT_TOP_MATCHES + 1]
        self.cu_plot = umap_plot_from_data(
            self.sample,
            self.reference,
            self.cu_umap_df,
            close_up=True,
        )
        logger.info("UMAP close-up plot generated.")

        # Convert to json.
        self.plot_json = self.plot.to_json()
        self.cu_plot_json = self.cu_plot.to_json()

    def save_to_disk(self):
        """Saves attributes to disk."""
        # Save methylation matrix.
        np.save(self.path("methyl"), self.methyl_overlap)

        # Save UMAP Matrix.
        self.umap_df.to_csv(self.path("umap_csv"), index=False)

        # Write UMAP plot to disk.
        file_path = self.path("umap_all")
        self.plot.write_html(file_path, config=dict({"scrollZoom": True}))
        self.plot.write_json(file_path[:-4] + "json")
        self.plot.write_image(file_path[:-4] + "png") # Time consumption 1.8s

        # Write UMAP close-up plot to disk.
        file_path = self.path("umap_top")
        self.cu_plot.write_html(file_path, config=dict({"scrollZoom": True}))
        self.cu_plot.write_json(file_path[:-4] + "json")
        self.cu_plot.write_image(
            file_path[:-4] + "png", width=450, height=400, scale=3
        )   # Time consumption 0.9s

        # Write pie chart to disk.
        self.pie_chart.write_image(
            self.path("pie"), width=450, height=400, scale=3
        )

        # Save close up ranking report.
        self.save_ranking_report() # Time consumption 0.4s

    def save_ranking_report(self):
        """Save pdf containing the nearest neighbours from umap analyis."""
        rows = [row for _, row in self.cu_umap_df.iterrows()]
        html_report = render_template("umap_report.html", rows=rows)
        convert_html_to_pdf(html_report, self.path("ranking"))
        with open(self.path("cpg_cnt"), "w") as f:
            f.write(f"{len(self.sample.cpg_overlap)}")

    def files_on_disk(self):
        """Check if files are on disk."""
        return (
            os.path.exists(self.path("methyl")) and
            os.path.exists(self.path("umap_all_json")) and
            os.path.exists(self.path("umap_top_json")) and
            os.path.exists(self.path("umap_csv"))
        )

    def read_from_disk(self):
        """Read plot data from disk."""
        # Read UMAP plot as json.
        with open(self.path("umap_all_json"), "r") as f:
            self.plot_json = f.read()

        # Read UMAP close-up plot as json.
        with open(self.path("umap_top_json"), "r") as f:
            self.cu_plot_json = f.read()

        # Read Methylation Matrix.
        self.methyl_overlap = np.load(self.path("methyl"), allow_pickle=True)

        # Read UMAP Matrix.
        self.umap_df = pd.read_csv(self.path("umap_csv"))

    def read_precalculated_umap_matrix(self, umap_matrix):
        """Reads precalculated UMAP matrix from disk."""
        path_xlsx = composite_path(
            NANODIP_REPORTS,
            self.sample.name,
            umap_matrix.replace(".xlsx", ""),
            ENDING["umap_xlsx"],
        )
        precalculated_umap = pd.read_excel(
            path_xlsx,
            header=0,
            names=["id", "x", "y"],
            engine="openpyxl",
        )   # TODO better use csv. Time consumption 4.4s
        reference_df = pd.DataFrame(
            zip(
                self.reference.specimens,
                self.reference.methylation_class,
                self.reference.description,
            ),
            columns=["id", "methylation_class", "description"],
        )
        if not self.sample.name in reference_df.id.values:
            reference_df.loc[len(reference_df.index)] = [
                self.sample.name, self.sample.name, "Analysis Sample"
            ]
        self.umap_df = pd.merge(precalculated_umap, reference_df, on = "id")
        umap_sample = self.umap_df[["x", "y"]].loc[
            self.umap_df.id==self.sample.name
        ].values
        self.umap_df["distance"] = [
            np.linalg.norm([z.x, z.y] - umap_sample)
            for z in self.umap_df.itertuples()
        ]
        self.umap_df = self.umap_df.sort_values(by="distance")


### MinKNOW API Functions
Check https://github.com/nanoporetech/minknow_api for reference.

The following code requires a patched version of the MinKNOW API. Install it
from https://github.com/neuropathbasel/minknow_api.


In [None]:
# Define logger
logger = logging.getLogger(__name__)

In [None]:
def minknow_manager():
    """Construct a manager using the host and port provided. This is
    used to connect to the MinKNOW service trough the MK API.

    minknow_api.manager.Manager:  a wrapper around MinKNOW's Manager
        gRPC API with utilities for querying sequencing positions and
        offline basecalling tools.
    """
    return Manager(host=THIS_HOST, port=9501, use_tls=False)

In [None]:
def minion_positions():
    """Return MinION devices that are currenty connected to the system."""
    manager = minknow_manager()
    # Find a list of currently available sequencing positions.
    positions = manager.flow_cell_positions()
    # User could call {posisions.connect()} here to connect to the
    # running MinKNOW instance.
    return positions

In [None]:
def connection_from_device_id(device_id):
    """Returns minion position."""
    position = next(
        (pos for pos in minion_positions() if pos.name == device_id),
        False,
    )
    if not position:
        raise ValueError(f"'{device_id}' is not a valid Minion position.")
    connection = position.connect()
    return connection

In [None]:
def flow_cell_id(device_id):
    """Return flow cell ID (if any). Note that some CTCs have an
    empty ID string.
    """
    cell_id = "no_flow_cell"
    positions = minion_positions()
    for p in positions:
        if device_id in p.name:
            connection = p.connect()
            cell_id = connection.device.get_flow_cell_info().flow_cell_id
    return cell_id

In [None]:
def parse_args(args_list):
    """Build and execute a command line argument for starting a protocol.

    Returns:
        Parsed arguments to be used when starting a protocol.
    """
    parser = argparse.ArgumentParser(
        description="""
        Run a sequencing protocol in a running MinKNOW instance.
        """
    )
    parser.add_argument(
        "--host",
        default="localhost",
        help="IP address of the machine running MinKNOW (defaults to localhost)",
    )
    parser.add_argument(
        "--port",
        help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)",
    )
    parser.add_argument(
        "--no-tls",
        help="Disable tls connection",
        default=False,
        action="store_true",
    )
    parser.add_argument(
        "--verbose",
        action="store_true",
        help="Enable debug logging",
    )
    parser.add_argument(
        "--sample-id",
        help="sample ID to set",
    )
    parser.add_argument(
        "--experiment-group",
        "--group-id",
        help="experiment group (aka protocol group ID) to set",
    )
    parser.add_argument(
        "--position",
        help="position on the machine (or MinION serial number) to run the protocol at",
    )
    parser.add_argument(
        "--flow-cell-id",
        metavar="FLOW-CELL-ID",
        help="ID of the flow-cell on which to run the protocol. (specify this or --position)",
    )
    parser.add_argument(
        "--kit",
        required=True,
        help="Sequencing kit used with the flow-cell, eg: SQK-LSK108",
    )
    parser.add_argument(
        "--product-code",
        help="Override the product-code stored on the flow-cell and previously user-specified"
        "product-codes",
    )
    # Basecalling arguments
    parser.add_argument(
        "--basecalling",
        action="store_true",
        help="enable base-calling using the default base-calling model",
    )
    parser.add_argument(
        "--basecall-config",
        help="specify the base-calling config and enable base-calling",
    )
    # Barcoding arguments
    parser.add_argument(
        "--barcoding",
        action="store_true",
        help="protocol uses barcoding",
    )
    parser.add_argument(
        "--barcode-kits",
        nargs="+",
        help="bar-coding expansion kits used in the experiment",
    )
    parser.add_argument(
        "--trim-barcodes",
        action="store_true",
        help="enable bar-code trimming",
    )
    parser.add_argument(
        "--barcodes-both-ends",
        action="store_true",
        help="bar-code filtering (both ends of a strand must have a matching barcode)",
    )
    parser.add_argument(
        "--detect-mid-strand-barcodes",
        action="store_true",
        help="bar-code filtering for bar-codes in the middle of a strand",
    )
    parser.add_argument(
        "--min-score",
        type=float,
        default=0.0,
        help="read selection based on bar-code accuracy",
    )
    parser.add_argument(
        "--min-score-rear",
        type=float,
        default=0.0,
        help="read selection based on bar-code accuracy",
    )
    parser.add_argument(
        "--min-score-mid",
        type=float,
        default=0.0,
        help="read selection based on bar-code accuracy",
    )
    # Alignment arguments
    parser.add_argument(
        "--alignment-reference",
        help="Specify alignment reference to send to basecaller for live alignment.",
    )
    parser.add_argument(
        "--bed-file",
        help="Specify bed file to send to basecaller.",
    )
    # Output arguments
    parser.add_argument(
        "--fastq",
        action="store_true",
        help="enables FastQ file output, defaulting to 4000 reads per file",
    )
    parser.add_argument(
        "--fastq-reads-per-file",
        type=int,
        default=4000,
        help="set the number of reads combined into one FastQ file.",
    )
    parser.add_argument(
        "--fast5",
        action="store_true",
        help="enables Fast5 file output, defaulting to 4000 reads per file, this will store raw, "
        "fastq and trace-table data",
    )
    parser.add_argument(
        "--fast5-reads-per-file",
        type=int,
        default=4000,
        help="set the number of reads combined into one Fast5 file.",
    )
    parser.add_argument(
        "--bam",
        action="store_true",
        help="enables BAM file output, defaulting to 4000 reads per file",
    )
    parser.add_argument(
        "--bam-reads-per-file",
        type=int,
        default=4000,
        help="set the number of reads combined into one BAM file.",
    )
    # Read until arguments
    parser.add_argument(
        "--read-until-reference",
        type=str,
        help="Reference file to use in read until",
    )
    parser.add_argument(
        "--read-until-bed-file",
        type=str,
        help="Bed file to use in read until",
    )
    parser.add_argument(
        "--read-until-filter",
        type=str,
        choices=["deplete", "enrich"],
        help="Filter type to use in read until",
    )
    # Experiment arguments
    parser.add_argument(
        "--experiment-duration",
        type=float,
        default=72,
        help="time spent sequencing (in hours)",
    )
    parser.add_argument(
        "--no-active-channel-selection",
        action="store_true",
        help="allow dynamic selection of channels to select pores for sequencing, "
        "ignored for Flongle flow-cells",
    )
    parser.add_argument(
        "--mux-scan-period",
        type=float,
        default=1.5,
        help="number of hours before a mux scan takes place, enables active-channel-selection, "
        "ignored for Flongle flow-cells",
    )
    parser.add_argument(
        "extra_args",
        metavar="ARGS",
        nargs="*",
        help="Additional arguments passed verbatim to the protocol script",
    )
    args = parser.parse_args(args_list)
    # Further argument checks
    # 'Read until' must have a reference and a filter type, if enabled:
    if (
        args.read_until_filter is not None
        or args.read_until_reference is not None
        or args.read_until_bed_file is not None
    ):
        if args.read_until_filter is None:
            print("Unable to specify read until arguments without a filter type.")
            sys.exit(1)

        if args.read_until_reference is None:
            print("Unable to specify read until arguments without a reference type.")
            sys.exit(1)

    if args.bed_file and not args.alignment_reference:
        print("Unable to specify '--bed-file' without '--alignment-reference'.")
        sys.exit(1)

    if (args.barcoding or args.barcode_kits) and not (
        args.basecalling or args.basecall_config
    ):
        print(
            "Unable to specify '--barcoding' or '--barcode-kits' without '--basecalling'."
        )
        sys.exit(1)
    if args.alignment_reference and not (args.basecalling or args.basecall_config):
        print("Unable to specify '--alignment-reference' without '--basecalling'.")
        sys.exit(1)
    if not (args.fast5 or args.fastq):
        print("No output (fast5 or fastq) specified")

    return args

In [None]:
def start_run(
    device_id="",
    sample_id="",
    run_duration="",
    start_voltage="",
):
    """Start a run on Mk1b devices and perform several checks concerning
    the run protocol.

    Code modified from the MinKNOW API on
    https://github.com/nanoporetech/minknow_api
    (2022-03) created from the sample code at
    https://github.com/nanoporetech/minknow_api/blob/master/python/minknow_api/examples/start_protocol.py

    We need 'find_protocol' to search for the required protocol given a kit
    and product code.
    """
    args_list = [
        "--host", "localhost",
        "--position", device_id,
        "--sample-id", sample_id,
        "--experiment-group", sample_id,
        "--experiment-duration", run_duration,
        "--basecalling",
        "--fastq",
        "--fastq-reads-per-file", READS_PER_FILE,
        "--fast5",
        "--fast5-reads-per-file", READS_PER_FILE,
        "--verbose",
        "--kit", "SQK-RBK004",
        "--barcoding",
        "--barcode-kits", "SQK-RBK004",
        "--",  # Required for so-called extra-arguments.
        "--start_bias_voltage", start_voltage,
    ]

    # Parse arguments to be passed to started protocols:
    args = parse_args(args_list)

    # # Specify --verbose on the command line to get extra details.
    # if args.verbose:
        # logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

    # Find which positions we are going to start protocol on:
    positions = [
        pos for pos in minion_positions() if is_position_selected(pos, args)
    ]

    # At least one position needs to be selected:
    if not positions:
        print(
            "No positions selected for protocol - specify "
            "'--position' or '--flow-cell-id'"
        )
        return []

    # Start protocol on the requested postitions:
    print("Starting protocol on %s positions." % len(positions))
    run_ids = []

    for pos in positions:
        # Connect to the sequencing position:
        connection = pos.connect()

        # Check if a flowcell is available for sequencing
        flow_cell_info = connection.device.get_flow_cell_info()
        if not flow_cell_info.has_flow_cell:
            print(f"No flow cell present in position {pos}")
            return []

        # Select product code:
        if args.product_code:
            product_code = args.product_code
        else:
            product_code = flow_cell_info.user_specified_product_code
            if not product_code:
                product_code = flow_cell_info.product_code

        # Find the protocol identifier for the required protocol:
        protocol_info = protocols.find_protocol(
            connection,
            product_code=product_code,
            kit=args.kit,
            basecalling=args.basecalling,
            basecall_config=args.basecall_config,
            barcoding=args.barcoding,
            barcoding_kits=args.barcode_kits,
        )

        if not protocol_info:
            print("Failed to find protocol for position %s" % (pos.name))
            print("Requested protocol:")
            print("  product-code: %s" % args.product_code)
            print("  kit: %s" % args.kit)
            print("  basecalling: %s" % args.basecalling)
            print("  basecall_config: %s" % args.basecall_config)
            print("  barcode-kits: %s" % args.barcode_kits)
            print("  barcoding: %s" % args.barcoding)
            print("Protocol build error, consult application log.")
            return []

        # Store the identifier for later:
        protocol_id = protocol_info.identifier

        # Now select which arguments to pass to start protocol:
        print("Starting protocol %s on position %s" % (protocol_id, pos.name))

        # Set up user specified product code if requested:
        if args.product_code:
            connection.device.set_user_specified_product_code(
                code=args.product_code
            )

        # Build arguments for starting protocol:
        basecalling_args = None
        if args.basecalling or args.basecall_config:
            barcoding_args = None
            alignment_args = None
            if args.barcode_kits or args.barcoding:
                barcoding_args = protocols.BarcodingArgs(
                    args.barcode_kits,
                    args.trim_barcodes,
                    args.barcodes_both_ends,
                    args.detect_mid_strand_barcodes,
                    args.min_score,
                    args.min_score_rear,
                    args.min_score_mid,
                )

            if args.alignment_reference:
                alignment_args = protocols.AlignmentArgs(
                    reference_files=[args.alignment_reference],
                    bed_file=args.bed_file,
                )

            basecalling_args = protocols.BasecallingArgs(
                config=args.basecall_config,
                barcoding=barcoding_args,
                alignment=alignment_args,
            )

        read_until_args = None
        if args.read_until_filter:
            read_until_args = protocols.ReadUntilArgs(
                filter_type=args.read_until_filter,
                reference_files=[args.read_until_reference],
                bed_file=args.read_until_bed_file,
                first_channel=None,  # These default to all channels.
                last_channel=None,
            )

        def build_output_arguments(args, name):
            if not getattr(args, name):
                return None
            return protocols.OutputArgs(
                reads_per_file=getattr(args, "%s_reads_per_file" % name)
            )

        fastq_arguments = build_output_arguments(args, "fastq")
        fast5_arguments = build_output_arguments(args, "fast5")
        bam_arguments = build_output_arguments(args, "bam")

        # print the protocol parameters
        print("connection {connection}")
        print("protocol_id {protocol_id}")
        print("args.sample_id {args.sample_id}")
        print("args.experiment_group {args.experiment_group}")
        print("basecalling_args {basecalling_args}")
        print("read_until_args {read_until_args}")
        print(
            "fastq_arguments {fastq_arguments}"
        )  # fastq_arguments OutputArgs(reads_per_file=400)
        print(
            "fast5_arguments {fast5_arguments}"
        )  # fast5_arguments OutputArgs(reads_per_file=400)
        print("bam_arguments {bam_arguments}")
        print(
            "args.no_active_channel_selection "
            "{args.no_active_channel_selection}"
        )
        print("args.mux_scan_period {args.mux_scan_period}")
        print("args.experiment_duration {args.experiment_duration}")
        print(
            "args.extra_args {args.extra_args}"
        )  # Any extra args passed.

        # Now start the protocol:
        run_id = protocols.start_protocol(
            connection,
            protocol_id,
            sample_id=args.sample_id,
            experiment_group=args.experiment_group,
            basecalling=basecalling_args,
            read_until=read_until_args,
            fastq_arguments=fastq_arguments,
            fast5_arguments=fast5_arguments,
            bam_arguments=bam_arguments,
            disable_active_channel_selection=args.no_active_channel_selection,
            mux_scan_period=args.mux_scan_period,
            experiment_duration=args.experiment_duration,
            args=args.extra_args,  # Any extra args passed.
        )
        run_ids.append(run_id)
    return run_ids

In [None]:
def stop_run(device_id):
    """Stop an existing run (if any) for a MinION device and return the
    protocol ID.
    """
    connection = connection_from_device_id(device_id)
    protocol = connection.protocol.list_protocol_runs()
    protocol_id = protocol.run_ids[-1]
    # TODO @HEJU in stopRun wird über bufferedRunIds geloopt. Notwendig?
    try:
        connection.protocol.stop_protocol()
        return protocol_id
    except grpc._channel._InactiveRpcError:
        return None

In [None]:
def is_position_selected(position, args):
    """Find if the {position} is selected by command line arguments
    {args}.
    Function from minknow_api demos, start_seq.py
    """
    # First check for name match:
    if args.position == position.name:
        return True

    # Then verify if the flow cell matches:
    connected_position = position.connect()
    if args.flow_cell_id is not None:
        flow_cell_info = connected_position.device.get_flow_cell_info()
        if (
            flow_cell_info.user_specified_flow_cell_id == args.flow_cell_id
            or flow_cell_info.flow_cell_id == args.flow_cell_id
        ):
            return True
    return False

In [None]:
def active_run(device_id):
    """Returns active run id."""
    connection = connection_from_device_id(device_id)
    try:
        # Error if no acquisition is running, same as with
        # acquisitio.current_status(), no acquisition until
        # temperature reached
        active_run = connection.acquisition.get_current_acquisition_run().run_id
    except grpc._channel._InactiveRpcError:
        active_run = "none"
    return active_run

In [None]:
def device_activity(device_id):
    """Returns device activity. Virtual test runs will be recognized as
    active.
    """
    connection = connection_from_device_id(device_id)
    status = connection.acquisition.current_status().status
    device_activity = {
        STARTING: "sequencing",
        PROCESSING: "sequencing",
        FINISHING: "sequencing",
        READY: "idle",
    }
    return device_activity.get(status, "")

In [None]:
def real_device_activity(device_id):
    """Returns device activity by checking the target temperature."""
    connection = connection_from_device_id(device_id)
    target_temp =str(
        connection.minion_device.get_settings().temperature_target.min
    )
    device_activity = {
        "34.0": "sequencing",
        "35.0": "idle",
        "37.0": "checking flow cell",
    }
    return device_activity.get(target_temp, "")

In [None]:
def number_of_called_bases(device_id):
    """Returns number of called bases."""
    connection = connection_from_device_id(device_id)
    # Check if device is working.
    if connection.acquisition.current_status().status == READY:
        return 0
    acquisition = connection.acquisition.get_acquisition_info()
    num_bases = acquisition.yield_summary.estimated_selected_bases
    return num_bases

In [None]:
def device_status(device_id):
    """MinKNOW status for device {device_id}."""
    connection = connection_from_device_id(device_id)
    current_bases = number_of_called_bases(device_id)
    needed_mb = round(NEEDED_NUMBER_OF_BASES // 1e6, 2)
    current_mb = round(current_bases / 1e6, 2)
    progress = round(100*current_mb/needed_mb, 1)
    status = {
        "Real device activity": real_device_activity(device_id),
        "Active run": active_run(device_id),
        "Progress": f"{progress}% ({current_mb} MB / {needed_mb} MB)",
        "acquisition.get_acquisition_info().state": str(
            connection.acquisition.get_acquisition_info().state
        ),
        "acquisition.current_status()": str(
            connection.acquisition.current_status()
        ),
        "minion_device.get_settings().temperature_target.min": str(
            connection.minion_device.get_settings().temperature_target.min
        ),
        "device.get_temperature()": str(
            connection.device.get_temperature().minion.heatsink_temperature
        ),
        "device.get_bias_voltage()": str(connection.device.get_bias_voltage()),
    }
    # Progress is only needed if device is sequencing.
    if active_run(device_id) == "none":
        status.pop("Progress")
    return status

In [None]:
def run_state(device_id):
    """Obtain further information about a particular device / run."""
    connection = connection_from_device_id(device_id)
    try:
        state = f"Run state for {device_id}: "
        state += str(connection.protocol.get_current_protocol_run().state)
        state += "/"
        state += str(connection.acquisition.get_acquisition_info().state)
    except grpc._channel._InactiveRpcError:
        state = f"No state information in MinKNOW buffer for {device_id}"
    return state

In [None]:
def run_sample_id(device_id):
    """Get sample ID from MinKNOW by device, only available after data
    acquisition has been initiated by MinKNOW.
    """
    connection = connection_from_device_id(device_id)
    try:
        sample_id = (
            connection.protocol.get_current_protocol_run().user_info.sample_id.value
        )
    except grpc._channel._InactiveRpcError:
        sample_id = (
            f"No sampleId information in MinKNOW buffer for {device_id}"
        )
    return sample_id

In [None]:
def run_yield(device_id):
    """Get run yield by device. The data of the previous run will remain in
    the buffer until acquisition (not just a start) of a new run have been
    initiated.
    """
    connection = connection_from_device_id(device_id)
    try:
        acq_info = connection.acquisition.get_acquisition_info()
        yield_ = f"Run yield for {device_id} ({acq_info.run_id}):&nbsp;"
        yield_ += str(acq_info.yield_summary)
    # TODO this exception has not been tested.
    except grpc._channel._InactiveRpcError:
        yield_ = f"No yield information in MinKNOW buffer for {device_id}"
    return yield_

In [None]:
def run_information(device_id):
    """Get current run information. Only available after data acquisition
    has started.
    """
    connection = connection_from_device_id(device_id)
    try:
        info = (f"Run information for {device_id}<br><br>"
               + str(connection.protocol.get_current_protocol_run()))
    except grpc.RpcError:
        info = f"No protocol information in MinKNOW buffer for {device_id}"
    return info

In [None]:
def set_bias_voltage(device_id, voltage):
    """Change MinKnow bias voltage."""
    connection = connection_from_device_id(device_id)
    previous_voltage = connection.device.get_bias_voltage().bias_voltage
    connection.device.set_bias_voltage(bias_voltage=float(voltage))

In [None]:
def single_file_methylation_caller(analysis_dir):
    """Invokes f5c methylation caller on a single fast5/fastq file and
    calculates methylation frequencies and CpG overlaps.

    Args:
        analysis_dir: Directory containing fast5 and fastq files to be
            analyzed. The basename of these files (corresponding to the
            run id) must be equal to the name of analysis_dir.
            The resulting files of the methylation calling process will
            be saved in analysis_dir.
    """
    file_name = os.path.basename(analysis_dir)
    base_path = os.path.join(analysis_dir, file_name)
    # Create index file for f5c.
    f5c_index = [
        F5C, "index",
        "-t", "1",
        "--iop", "100",
        "--directory", analysis_dir,
        base_path + ".fastq",
    ]
    # Aligns reads to reference genome and sorts resulting bam files
    # (4 threads).
    seq_align = [
        MINIMAP2,
        "-a",
        "-x", "map-ont",
        REFERENCE_GENOME_MMI,
        base_path + ".fastq",
        "-t", "4",
        "|",
        SAMTOOLS, "sort",
        "-T", "tmp",
        "-o", base_path + "-reads_sorted.bam",
    ]
    # Make bam index for samtools.
    bam_index = [
        SAMTOOLS, "index",
        base_path + "-reads_sorted.bam",
    ]
    # Methylation caller.
    methyl_calling = [
        F5C, "call-methylation",
        #"--disable-cuda=yes",   # For debugging on CPU only.
        "-B2000000", "-K400",    # Set B to 2 megabases (GPU) and 0.4 kreads
        "-b", base_path + "-reads_sorted.bam",
        "-g", REFERENCE_GENOME_FA,
        "-r", base_path + ".fastq",
        ">", base_path + "-result.tsv",
    ]
    # Calculate methylation frequencies.
    methyl_frequency = [
        F5C, "meth-freq",
        "-c", "2.5",
        "-s",
        "-i", base_path + "-result.tsv",
        ">",
        base_path + "-freq.tsv",
    ]
    commands = [
        f5c_index,
        seq_align,
        bam_index,
        methyl_calling,
        methyl_frequency,
    ]

    def log_subprocess_output(pipe):
        """Logs subprocess (to file) and also sends output to stdout."""
        stdout_data, stderr_data = pipe.communicate()
        for line in stdout_data.decode().split("\n"):
            logger.info(line)
            print(line)
        for line in stderr_data.decode().split("\n"):
            logger.info(line)
            print(line)

    for cmd in commands:
        cmd_str = " ".join(cmd)
        process = subprocess.Popen(
            cmd_str,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        log_subprocess_output(process)
        exit_code = process.wait()
        if exit_code != 0:
            logger.error("Error occured on subprocess '%s'", cmd[0])
    # Calculate CpG overlap.
    extract_referenced_cpgs(
        base_path + "-freq.tsv",
        base_path + "-methoverlap.tsv",
        base_path + "-methoverlapcount.txt",
    )
    # Write empty textfile to signal successful completion.
    with open(os.path.join(analysis_dir, "done.txt"), "w"):
        pass
    print(f"Methylation calling on {file_name} done.")

In [None]:
def remove_dirs_with_wrong_barcode(sample_name, barcode):
    """Removes directories with wrong barcode due to change of
    the predominant barcode after the first reads.
    """
    output_dirs = [
        os.path.join(NANODIP_OUTPUT, sample_name, dir_nm) for dir_nm in
        os.listdir(os.path.join(NANODIP_OUTPUT, sample_name))
    ]
    wrong_barcode_dirs = [
        f for f in output_dirs if barcode not in f
    ]
    for dir_path in wrong_barcode_dirs:
        shutil.rmtree(dir_path)

In [None]:
def methylation_caller(sample_name, analyze_one=True):
    """Searches for callable fast5/fastq files that have not yet been
    called and invokes methylation calling. Results will be added to
    the NANODIP_OUTPUT directory.

    Args:
        sample_name: Name of sample to be analyzed.
        analyse_one: If True only first fast5/fastq file found
                     will be analyzed.
    """
    # At least 2 "passed" files need to be present.
    barcode = predominant_barcode(sample_name)
    remove_dirs_with_wrong_barcode(sample_name, barcode)
    fast5_files = [
        f for f in files_by_ending(DATA, sample_name, ending=".fast5")
        if barcode in f
    ]
    # Analyse in alphanumeric ordering for improved debugging.
    fast5_files.sort()
    def from_5_to_q(fn):
        return fn.replace(
            ".fast5", ".fastq"
        ).replace("fast5_pass", "fastq_pass")

    # Collect all passed fast5/fastq pairs
    fast5q_file_pairs = [
        [f, from_5_to_q(f)] for f in fast5_files
        if os.path.exists(from_5_to_q(f))
    ]

    f5c_analysis_dir = os.path.join(NANODIP_OUTPUT, sample_name)
    if not os.path.exists(f5c_analysis_dir):
        os.mkdir(f5c_analysis_dir)

    prev_called = []
    curr_called = []
    not_called = []

    for f5, fq in fast5q_file_pairs:
        file_name = os.path.basename(f5).split(".")[0]
        analysis_dir = os.path.join(f5c_analysis_dir, file_name)
        symlink5 = os.path.join(analysis_dir, file_name + ".fast5")
        symlinkq = os.path.join(analysis_dir, file_name + ".fastq")
        if not os.path.exists(analysis_dir):
            os.mkdir(analysis_dir)
        if not os.path.exists(symlink5):
            os.symlink(f5, symlink5)
        if not os.path.exists(symlinkq):
            os.symlink(fq, symlinkq)
        done = os.path.join(analysis_dir, "done.txt")
        if os.path.exists(done):
            prev_called.append(file_name)
        else:
            not_called.append(
                [analysis_dir, file_name]
            )
    for directory, file_name in not_called:
        single_file_methylation_caller(directory)
        curr_called.append(file_name)
        if analyze_one:
            break
    num_completed = len(prev_called) + len(curr_called)
    num_fastq = len(fast5q_file_pairs)
    no_callable_left = num_fastq == num_completed
    return {
        "barcode": barcode,
        "called": curr_called,
        "num_completed": num_completed,
        "num_fastq": num_fastq,
        "no_callable_left": no_callable_left,
        "time": date_time_string_now(),
    }


### CherryPy Web UI
The browser-based user interface is based on CherryPy, which contains an
integrated web server and serves pages locally. Communication between the
service and browser typically generates static web pages that may or may not
contain automatic self refresh commands. In the case of self-refreshing pages,
the browser will re-request a given page with leads to re-execution of the
respective python functions. The main handles to these function are located in
the Web UI cell below.


In [None]:
# Define logger
logger = logging.getLogger(__name__)

In [None]:
class Device:
    """Container used to store auto-termination status for a single
    device.
    """
    def __init__(self, device_id):
        self.id = device_id
        self.termination_type = "manually"

    def __repr__(self):
        return f"(id={self.id}, termination_type={self.termination_type})"

In [None]:
class Devices:
    """List of Device objects, used to store auto-termination status for
    all devices.
    """
    def __init__(self):
        self.list = []

    def get(self, device_id):
        """Appends device {device_id} if necessary and returns it."""
        device = self.get_device(device_id)
        if device:
            return device
        device = Device(device_id)
        self.list.append(device)
        return device

    def pop(self, device_id):
        """Removes and returns device."""
        device = self.get_device(device_id)
        if device:
            self.list.remove(device)
            return device
        return None

    def get_device(self, device_id):
        """Returns device with given id. Returns false if id is not found."""
        return next(
            (device for device in self.list if device_id == device.id),
            False,
        )

    def __iter__(self):
        return iter(self.list)

    def __contains__(self, other):
        return other in [d.id for d in self]

    def __repr__(self):
        out = "["
        out += ", ".join([str(d) for d in self.list])
        out += "]"
        return out

In [None]:
def download_epidip_data(sentrix_id, reference_umap):
    """Downloads UMAP plot coordinates of reference data and CNV plot of
    sample with given Sentrix ID.
    """
    umap_coordinates_local = composite_path(
        NANODIP_REPORTS,
        sentrix_id,
        reference_umap[:-5],
        ENDING["umap_xlsx"],
    )

    url = UMAP_LINK % reference_umap
    request.urlretrieve(url, umap_coordinates_local)

    cnv_local = composite_path(NANODIP_REPORTS, sentrix_id, ENDING["cnv_pdf"])
    url = CNV_LINK % sentrix_id
    request.urlretrieve(url, cnv_local)

    image = convert_from_path(cnv_local)[0]
    image.save(cnv_local.replace("pdf", "png"), "png")

In [None]:
class UI:
    """User interface implemented as CherryPy webserver."""
    # global variables within the CherryPy Web UI
    cnv_sem = threading.Semaphore()
    umap_sem = threading.Semaphore()
    cpg_sem = threading.Semaphore()
    devices = Devices()

    @cherrypy.expose
    def index(self):
        """Start page."""
        total, used, free = shutil.disk_usage(DATA)
        sys_stat = {
            "hostname": socket.gethostname(),
            "disk_total": total // (2**30),
            "disk_used": used // (2**30),
            "disk_free": free // (2**30),
            "memory_free": round(
                psutil.virtual_memory().available * 100
                / psutil.virtual_memory().total
            ),
            "cpu": round(psutil.cpu_percent()),
            "cpgs": 1 - UI.cpg_sem._value,
            "cnvp": 1 - UI.cnv_sem._value,
            "umap": 1 - UI.umap_sem._value,
        }
        # Calculate URLs to avoid hard coding URLs in HTML templates.
        return render_template(
            "index.html",
            sys_stat=sys_stat,
            url_restart=url_for(UI.restart),
        )

    @cherrypy.expose
    def restart(self):
        """Restart CherryPy."""
        cherrypy.engine.restart()
        return render_template("restart.html")

    @cherrypy.expose
    def status(self):
        """List all devices with run-status and show UMAP/CMV preview if
        available.
        """
        device_ids = [pos.name for pos in minion_positions()]
        # Calculate URLs to avoid hard coding URLs in HTML templates.
        return render_template(
            "status.html",
            device_ids=device_ids,
            url_live_device_status=url_for(UI.status_device),
            url_live_plots=url_for(UI.status_plots),
        )

    @cherrypy.expose
    def status_device(self, device_id):
        """Lists run-status of device {device_id} and prints button for
        terminating run.
        """
        UI.launch_auto_terminator(device_id)
        status = None
        device = UI.devices.get(device_id)
        is_active = active_run(device_id) != "none"
        try:
            status = device_status(device_id)
            previous_activity = True
        except grpc._channel._InactiveRpcError:
            previous_activity = False
        return render_template(
            "status_device.html",
            device_id=device_id,
            status=status,
            flow_cell_id=flow_cell_id(device_id),
            sample_id=run_sample_id(device_id),
            yield_=run_yield(device_id),
            state=run_state(device_id),
            previous_activity=previous_activity,
            needed_mega_bases=NEEDED_NUMBER_OF_BASES // 1e6,
            url_auto_terminator = url_for(UI.set_auto_terminate),
            termination_type=device.termination_type,
            is_active=is_active,
        )

    @cherrypy.expose
    def status_plots(self, device_id=""):
        """Generate a live preview of the data analysis with the current
        plots.
        """
        if not device_id:
            raise cherrypy.HTTPError(404, "URL not found")
        # If there is a run that produces data, the run ID will exist.
        sample_id = run_sample_id(device_id)

        # Chose reference with latest png-UMAP file.
        umap_png_files = [
            f for f in os.listdir(NANODIP_REPORTS)
            if f.endswith(ENDING["umap_all_png"]) and sample_id in f
        ]
        if not umap_png_files:
            # Dummy reference name if no UMAP files are found (happens if
            # device is not sequencing and thus no sample_id can be found.
            reference = "none"
        else:
            latest_umap_png = max(
                [os.path.join(NANODIP_REPORTS, f) for f in umap_png_files],
                key=os.path.getmtime,
            )
            reference = re.search(
                sample_id +  "_(.*?)_" + ENDING["umap_all_png"],
                latest_umap_png
            ).group(1)

        cnv_plt_path_png = composite_path(
            "reports", sample_id, ENDING["cnv_png"],
        )
        cnv_plt_path_html = composite_path(
            "reports", sample_id, ENDING["cnv_html"],
        )
        umap_plt_path_png = composite_path(
            "reports", sample_id, reference, ENDING["umap_all_png"],
        )
        umap_plt_path_html = composite_path(
            "reports", sample_id, reference, ENDING["umap_all_html"],
        )
        return render_template(
            "status_plots.html",
            sample_id=sample_id,
            reference=reference,
            cnv_plt_path_png=cnv_plt_path_png,
            cnv_plt_path_html=cnv_plt_path_html,
            umap_plt_path_png=umap_plt_path_png,
            umap_plt_path_html=umap_plt_path_html,
        )

    @cherrypy.expose
    def start(
        self,
        device_id="",
        sample_id="",
        run_duration="",
        reference_id="",
        start_voltage="",
    ):
        """Start sequencing run."""
        start_now = bool(sample_id) and float(run_duration) >= 0.1
        if start_now:
            # Delete termination status info of last round.
            UI.devices.pop(device_id)
            run_ids = start_run(
                device_id=device_id,
                sample_id=sample_id,
                run_duration=run_duration,
                start_voltage=start_voltage,
            )
            return render_template(
                "start.html",
                start_now=start_now,
                test=False,
                sample_id=sample_id,
                reference_id=reference_id,
                device_id=device_id,
                run_id=" / ".join(run_ids),
                run_info=run_information(device_id),
            )
        positions = [p.name for p in minion_positions()]
        idle = [
            p for p in positions if real_device_activity(p) == "idle"
            and flow_cell_id(p) != ""
        ]
        flow_cell = {pos:flow_cell_id(pos) for pos in idle}
        return render_template(
            "start.html",
            start_now=start_now,
            test=False,
            idle=idle,
            flow_cell=flow_cell,
            references=reference_annotations(),
        )

    @cherrypy.expose
    def start_test(self, device_id=""):
        """Start sequencing test run."""
        if device_id:
            sample_id = (date_time_string_now() + "_TestRun_"
                + flow_cell_id(device_id))
            run_ids = start_run(
                device_id=device_id,
                sample_id=sample_id,
                run_duration="0.1",
                start_voltage="-180",
            )
            return render_template(
                "start.html",
                start_now=True,
                sample_id=sample_id,
                reference_id="TEST",
                device_id=device_id,
                run_id=" / ".join(run_ids),
                run_info=run_information(device_id),
            )
        positions = [p.name for p in minion_positions()]
        idle = [p for p in positions if real_device_activity(p) == "idle"
            and flow_cell_id(p) != ""]
        flow_cell = {pos:flow_cell_id(pos) for pos in idle}
        return render_template(
            "start.html",
            start_now=False,
            test=True,
            idle=idle,
            flow_cell=flow_cell,
            references=reference_annotations(),
        )

    @cherrypy.expose
    def stop_sequencing(self, device_id=""):
        """Can be used to manually stop a run."""
        protocol_id = stop_run(device_id)
        if protocol_id is None:
            return "No protocol running, nothing was stopped."
        return f"Protocol {protocol_id} stopped on {device_id}."

    @cherrypy.expose
    def list_runs(self):
        """Lists running and buffered sequencing runs."""
        mounted_flow_cell_id = {}
        current_status = {}
        flow_cell = {}
        run_ids = {}
        device_names = []

        for minion in minion_positions():
            name = minion.name
            connection = minion.connect()
            device_names.append(name)
            mounted_flow_cell_id[name] = connection.device.get_flow_cell_info(
                ).flow_cell_id
            # READY, STARTING, sequencing/mux = PROCESSING, FINISHING;
            # Pause = PROCESSING
            current_status[name] = connection.acquisition.current_status()
            protocols = connection.protocol.list_protocol_runs()
            run_ids[name] = protocols.run_ids
            for run_id in run_ids[name]:
                run_info = connection.protocol.get_run_info(run_id=run_id)
                flow_cell[(name, run_id)] = run_info.flow_cell.flow_cell_id

        return render_template(
            "list_runs.html",
            device_names=device_names,
            host=CHERRYPY_HOST,
            mounted_flow_cell_id=mounted_flow_cell_id,
            current_status=current_status,
            flow_cell=flow_cell,
            run_ids=run_ids,
        )

    @cherrypy.expose
    def results(self):
        """Lists all files in NANODIP_REPORTS."""
        files = get_all_results()
        urls = {f:f"reports/{f}" for f in files}
        return render_template(
            "results.html",
            files=files,
            urls=urls,
        )

    @cherrypy.expose
    def analysis(
        self,
        func="",
        sample_name="",
        reference_name="",
        new="False",
    ):
        """Creates an overview of all samples and provides the analysis
        tools (CpG methylation calling, CNV plot and UMAP plot).
        """
        if func == "":
            analysis_runs = [
                run for run in get_runs() if not any(
                    pattern in run for pattern in ANALYSIS_EXCLUSION_PATTERNS
                )
            ]
            annotations = reference_annotations()
            # Calculate URLs to avoid hard coding URLs in HTML templates.
            url_cnv = {}
            url_cnv_new = {}
            url_cpgs = {}
            url_pdf = {}
            url_umap = {}
            url_umap_new = {}
            for run in analysis_runs:
                url_cnv[run] = url_for(
                    UI.analysis, func="cnv", sample_name=run,
                )
                url_cnv_new[run] = url_for(
                    UI.analysis, func="cnv", sample_name=run, new=True,
                )
                url_cpgs[run] = url_for(
                    UI.analysis, func="cpgs", sample_name=run,
                )
                for annotation in annotations:
                    url_umap_new[(run,annotation)] = url_for(
                        UI.analysis,
                        func="umap",
                        sample_name=run,
                        reference_name=annotation,
                        new=True,
                    )
                    url_umap[(run,annotation)] = url_for(
                        UI.analysis,
                        func="umap",
                        sample_name=run,
                        reference_name=annotation,
                    )
                    url_pdf[(run,annotation)] = url_for(
                        UI.make_pdf,
                        sample_name=run,
                        reference_name=annotation,
                    )
            return render_template(
                "analysis_start.html",
                analysis_runs=analysis_runs,
                annotations=annotations,
                url_cnv=url_cnv,
                url_cnv_new=url_cnv_new,
                url_cpgs=url_cpgs,
                url_pdf=url_pdf,
                url_umap=url_umap,
                url_umap_new=url_umap_new,
            )
        if func == "cnv":
            genome = Genome()
            genes = genome.genes.name.to_list()
            return render_template(
                "analysis_cnv.html",
                url_cnv=url_for(UI.cnv),
                sample_name=sample_name,
                genes=genes,
                new=new,
            )
        if func == "umap":
            return render_template(
                "analysis_umap.html",
                url_umap=url_for(UI.umap),
                sample_name=sample_name,
                reference_name=reference_name,
                new=new,
                first_use = not binary_reference_data_exists(),
            )
        if func == "cpgs":
            return render_template(
                "analysis_cpg.html",
                url_cpgs=url_for(UI.cpgs),
                start_time=date_time_string_now(),
                sample_name=sample_name,
            )
        raise cherrypy.HTTPError(404, "URL not found")

    @cherrypy.expose
    def cnv(self, sample_name, genes="", new="False"):
        """Creates CNV plot and returns it as JSON."""
        UI.cnv_sem.acquire()
        try:
            cnv_data = CNVData(sample_name)
        except FileNotFoundError:
            raise cherrypy.HTTPError(405, "URL not allowed")

        if not cnv_data.files_on_disk() or new == "True":
            try:
                cnv_data.make_cnv_plot()
            except ValueError:
                UI.cnv_sem.release()
                raise cherrypy.HTTPError(405, "No data to plot.")
        else:
            cnv_data.read_from_disk()
        UI.cnv_sem.release()
        return cnv_data.plot_cnv_and_genes([genes])

    @cherrypy.expose
    def umap(self, sample_name, reference_name, close_up="", new="False"):
        """Creates UMAP plot and returns it as JSON."""
        UI.umap_sem.acquire()
        try:
            umap_data = UMAPData(sample_name, reference_name)
        except FileNotFoundError:
            raise cherrypy.HTTPError(405, "URL not allowed")
        if not umap_data.files_on_disk() or new == "True":
            try:
                umap_data.make_umap_plot()
            except ValueError:
                UI.umap_sem.release()
                raise cherrypy.HTTPError(405, "No data to plot.")
        else:
            umap_data.read_from_disk()
        UI.umap_sem.release()
        if close_up == "True":
            return umap_data.cu_plot_json
        return umap_data.plot_json

    @cherrypy.expose
    def make_pdf(self, sample_name=None, reference_name=None):
        """Generates PDF report."""
        path_cgp = composite_path(
            NANODIP_REPORTS, sample_name, reference_name, ENDING["cpg_cnt"]
        )
        path_reads = composite_path(
            NANODIP_REPORTS, sample_name, ENDING["aligned_reads"]
        )
        try:
            with open(path_cgp, "r") as f:
                overlap_cnt = f.read()
            with open(path_reads, "r") as f:
                read_numbers = f.read()
        except FileNotFoundError:
            overlap_cnt = 0
            read_numbers = 0
        cnv_path = composite_path(
            NANODIP_REPORTS, sample_name, ENDING["cnv_png"]
        )
        umap_path = composite_path(
            NANODIP_REPORTS, sample_name, reference_name, ENDING["umap_top_png"],
        )
        pie_chart_path = composite_path(
            NANODIP_REPORTS, sample_name, reference_name, ENDING["pie"]
        )

        html_report = render_template(
            "pdf_report.html",
            sample_name=sample_name,
            sys_name=socket.gethostname(),
            date=date_time_string_now(),
            barcode=predominant_barcode(sample_name),
            reads=read_numbers,
            cpg_overlap_cnt=overlap_cnt,
            reference=reference_name,
            cnv_path=cnv_path,
            umap_path=umap_path,
            pie_chart_path=pie_chart_path,
        )
        report_path = composite_path(
            NANODIP_REPORTS, sample_name, reference_name, ENDING["report"],
        )
        server_report_path = composite_path(
            "reports", sample_name, reference_name, ENDING["report"],
        )
        convert_html_to_pdf(html_report, report_path)
        raise cherrypy.HTTPRedirect(server_report_path)

    @cherrypy.expose
    def set_auto_terminate(self, device_id="", termination_type=""):
        """Sets auto termination status."""
        device = UI.devices.get(device_id)
        if termination_type in ["terminated", "manually", "auto"]:
            device.termination_type = termination_type
            logger.info(
                "Auto terminate status of %s set to %s",
                device_id,
                device.termination_type,
            )
        else:
            raise cherrypy.HTTPError(
                404, "Invalid termination type: '{termination_type}'"
            )
        if termination_type == "terminated":
            stop_run(device.id)

    def launch_auto_terminator(device_id=""):
        """Terminates the current run if auto terminator is set."""
        device = UI.devices.get(device_id)
        if (
            device.termination_type == "auto" and
            number_of_called_bases(device.id) > NEEDED_NUMBER_OF_BASES
        ):
            stop_run(device.id)

    @cherrypy.expose
    def cpgs(self, sample_name=""):
        """Invokes methylation calling and returns statistics."""
        UI.cpg_sem.acquire()
        stats = methylation_caller(sample_name)
        UI.cpg_sem.release()
        return json.dumps(stats)

    @cherrypy.expose
    def change_voltage(self, device_id="", voltage=""):
        """Change bias voltage."""
        set_bias_voltage(device_id, voltage)
        return render_template(
            "change_voltage.html",
            voltage=voltage,
        )

    @cherrypy.expose
    def epidip_report(
        self,
        sentrix_id=None,
        reference_id=None,
        reference_umap=None,
    ):
        """Create report for reference case {sentrix_id} with precalculated
        umap coordinates.
        """
        if sentrix_id and reference_id and reference_umap:
            download_epidip_data(sentrix_id, reference_umap)
            umap_data = UMAPData(sentrix_id, reference_id)
            umap_data.read_precalculated_umap_matrix(reference_umap)
            umap_data.draw_pie_chart()
            umap_data.draw_scatter_plots()
            umap_data.save_to_disk()
            UI.make_pdf(
                self, sample_name=sentrix_id, reference_name=reference_id
            )
        else:
            return render_template(
                "epidip_report.html",
                reference_umap=reference_umap,
                epidip_umaps=EPIDIP_UMAP_COORDINATE_FILES,
                references=reference_annotations(),
            )

    @cherrypy.expose
    def about(self):
        """About Page."""
        return render_template("about.html")

In [None]:
def start_webserver():
    """Start CherryPy Webserver."""
    if DEBUG_MODE:
        #Set access logging
        cherrypy.log.screen = True
        cherrypy.config.update({'log.screen': True})
    else:
        #Set access logging
        cherrypy.log.screen = False
        cherrypy.config.update({'log.screen': False})
        cherrypy.config.update({ "environment": "embedded" })
    cherrypy.log.access_file = composite_path(NANODIP_REPORTS, "cherrypy.log")
    cherrypy.log.error_file = composite_path(NANODIP_REPORTS, "cherrypy.log")

    print(f"NanoDiP server running at http://{CHERRYPY_HOST}:{CHERRYPY_PORT}")

    cherrypy_config = {
        '/favicon.ico': {
            'tools.staticfile.on': True,
            'tools.staticfile.filename': BROWSER_FAVICON,
        },
        '/reports': {
            'tools.staticdir.on': True,
            'tools.staticdir.dir': NANODIP_REPORTS,
        },
        '/static': {
            'tools.staticdir.on': True,
            'tools.staticdir.dir': os.path.join(os.getcwd(), "static"),
        },
    }
    cherrypy.quickstart(UI(), "/", cherrypy_config)

In [None]:
# Set logging options
logging.basicConfig(
    filename="/data/nanodip_reports/nanodip.log",
    # stream=sys.stdout,
    level=logging.INFO,
    format="%(levelname)s %(filename)s,%(lineno)d [%(asctime)s]: %(message)s",
    filemode="w",
)


if __name__ == "__main__":
    sanity_check()
    start_webserver()


### ^^^ LIVE LOG ABOVE ^^^
All CherryPy access will be logged here, including live progress bars for
computationally intense analyses. Detailed access logging is turned off by
default (accessLogging is False), but can be turned on, e.g., for debugging,
in the configuration section at the beginning of this notebook. While it is not
required to have at look at these during normal operation, information
contained in the log may be helpful in troubleshooting. Line numbers in error
messages indicated here typically match those given in the respective Jupyter
Notebook cells.

To preserve these messages, halt the Python kernel, save and close the notebook
to send it for support. This makes sure that the code as well as the error
messages will be preserved.

To launch the user interface, wait until you see a pink log entry that the web
server has started, then navigate to http://localhost:8080.
