# COSMX COUNT QC REPORT FOR SLIDE {{ COSMX_SLIDE_NAME }}
* **Notebook version:** v0.0.1
* **Created by:** NIHR Imperial BRC Genomics Facility
* **Maintained by:** NIHR Imperial BRC Genomics Facility
* **Docker image path:** [Dockerfile](https://github.com/imperial-genomics-facility/igf-dockerfiles/tree/main/cosmx/Dockerfile_v1)
* **Notebook code path:** [Templates](https://github.com/imperial-genomics-facility/igf-dockerfiles/tree/main/cosmx/)
* **Created on:** {{ DATE_TAG }}
* **Contact us:** [NIHR Imperial BRC Genomics Facility - Contact us](https://www.imperial.ac.uk/medicine/research-and-impact/facilities/genomics-facility/contact-us/)
* **License:** Apache [License 2.0](https://github.com/imperial-genomics-facility/igf-dockerfiles/blob/main/LICENSE)

* **Project name:** {{ COSMX_PROJECT_NAME }}
* **COSMX slide name:** {{ COSMX_SLIDE_NAME }}

In [None]:
import os
import json
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Tuple
from IPython.display import HTML, Markdown

In [None]:
SLIDE_FLAT_FILE_DIR = "{{ SLIDE_FLAT_FILE_DIR }}"
JSON_OUTPUT_DIR = "{{ JSON_OUTPUT_DIR }}"
SLIDE_METADATA_JSON_FILE = "{{ SLIDE_METADATA_JSON_FILE }}"

In [None]:
%%capture
def get_assay_and_panel_info_from_slide_meatadata(slide_metadata_json: str, assay_type_key: str = 'assay_type', panel_name_key: str = 'panel_name') -> str:
    json_data = {}
    with open(slide_metadata_json, 'r') as fp:
        json_data = json.load(fp)
    assay_type = json_data.get(assay_type_key)
    panel_name = json_data.get(panel_name_key)
    if assay_type is None or panel_name is None:
        raise KeyError(f"Failed to get assay type or panel name from file {slide_metadata_json}")
    return assay_type, panel_name

In [None]:
%%capture
def get_expr_mat_file_for_slide(slide_flat_file_dir: str) -> str:
    slide_flat_file_path = Path(slide_flat_file_dir)
    if not slide_flat_file_path.exists():
        raise IOError(f"FlatFiles dir {slide_flat_file_dir} not found")
    expr_mat_file = [
        file_name for file_name in slide_flat_file_path.glob("*exprMat_file.csv.gz")]
    if len(expr_mat_file) == 0:
        raise ValueError(f"No exprMat_file.csv.gz found in {slide_flat_file_dir}")
    expr_mat_file = expr_mat_file[0]
    return expr_mat_file.as_posix()

In [None]:
%%capture
def get_rna_expr_counts(exprMat_file: str) -> Tuple[List[dict], List[dict]]:
    try:
        expr_df = \
            pd.read_csv(
                exprMat_file,
                compression="gzip")
        count_qc_data = []
        ## get columns
        sys_ctrl_cols = [c for c in expr_df if c.startswith("SystemControl")]
        neg_cols = [c for c in expr_df if c.startswith("Negative")]
        gene_cols = [c for c in expr_df if c!='fov' and c!='cell_ID' and c not in sys_ctrl_cols and c not in neg_cols]
        ## count per fov
        for fov_id, f_data in expr_df.groupby('fov'):
            tmp_df = f_data.copy()
            tmp_df['total_gene_count'] = tmp_df[gene_cols].sum(axis=1)
            tmp_df['total_neg_count'] = tmp_df[neg_cols].sum(axis=1)
            tmp_df['total_sys_count'] = tmp_df[sys_ctrl_cols].sum(axis=1)
            min_neg_probe = tmp_df['total_neg_count'].mean()
            mean_transcript_per_cell = tmp_df['total_gene_count'].mean()
            percentile_90_transcript_per_cell = tmp_df['total_gene_count'].quantile(0.9)
            percentile_10_transcript_per_cell = tmp_df['total_gene_count'].quantile(0.1)
            total_cells = len(tmp_df.index)
            non_empty_cells = len(tmp_df[tmp_df['total_gene_count']>0].index)
            tmp_df['genes_per_cell'] = (tmp_df[gene_cols]>0).sum(axis=1)
            mean_genes_per_cell = tmp_df['genes_per_cell'].mean()
            count_qc_data.append({
                'FOV': fov_id,
                'Mean transcript per cell': f"{mean_transcript_per_cell:.2f}",
                'Mean unique genes per cell': f"{mean_genes_per_cell:.2f}",
                'Number of Non-empty cells': f"{non_empty_cells}",
                'PCT of Non-empty cells': f"{(non_empty_cells / total_cells):.2f}",
                '10th percentile transcript per cell': f"{percentile_90_transcript_per_cell:.1f}",
                '90th percentile transcript per cell': f"{percentile_10_transcript_per_cell:.1f}",
                'Mean Negprobe counts per cell': f"{min_neg_probe:.3f}"})
            df = pd.DataFrame(count_qc_data)
            df.rename(
                columns={
                    'FOV': "fov_id",
                    'Mean transcript per cell': 'mean_transcript_per_cell',
                    'Mean unique genes per cell': 'mean_unique_genes_per_cell',
                    'Number of Non-empty cells': 'number_non_empty_cells',
                    'PCT of Non-empty cells': 'pct_non_empty_cells',
                    '10th percentile transcript per cell': 'percentile_10_transcript_per_cell',
                    '90th percentile transcript per cell': 'percentile_90_transcript_per_cell',
                    'Mean Negprobe counts per cell': 'mean_negprobe_counts_per_cell'},
                inplace=True)
            df = \
                df.astype({
                    "fov_id": int,
                    'mean_transcript_per_cell': float,
                    'mean_unique_genes_per_cell': float,
                    'number_non_empty_cells': int,
                    'pct_non_empty_cells': float,
                    'percentile_10_transcript_per_cell': float,
                    'percentile_90_transcript_per_cell': float,
                    'mean_negprobe_counts_per_cell': float})
            count_qc_data_for_db = \
                df.todict(orient="records")
        return count_qc_data, count_qc_data_for_db
    except Exception as e:
        raise ValueError(f"Failed to get RNA QC for file {exprMat_file}, error: {e}")

In [None]:
%%capture
def get_protein_expr_counts(exprMat_file: str) -> Tuple[List[dict], List[dict]]:
    try:
        expr_df = \
            pd.read_csv(
                exprMat_file,
                compression="gzip")
        igG_cols = [c for c in expr_df.columns if 'IgG' in c]
        prot_cols = [p for p in expr_df.columns if p not in igG_cols and p != "fov" and p != "cell_ID"]
        count_qc_data = []
        ## count per fov
        for fov_id, f_data in expr_df.groupby('fov'):
            tmp_df = f_data.copy()
            tmp_df['total_counts'] = tmp_df[prot_cols].sum(axis=1)
            tmp_df['total_prot_counts'] = tmp_df[prot_cols+igG_cols].sum(axis=1)
            tmp_df['genes_per_cell'] = (tmp_df[prot_cols]>0).sum(axis=1)
            count_qc_data.append({
                'FOV': fov_id,
                'Mean Fluorescence Intensity (MFI)': f"{tmp_df['total_counts'].mean():.0f}",
                'Mean unique genes per cell': f"{tmp_df['genes_per_cell'].mean():.0f}",
                'Number of Non-empty cells': len(tmp_df[tmp_df['total_counts']>0].index),
                'PCT of Non-empty cells': f"{len(tmp_df[tmp_df['total_counts']>0].index) / len(tmp_df.index):.2f}",
                '10th percentile of Fluorescence Intensity': f"{tmp_df['total_counts'].quantile(0.1):.2f}",
                '90th percentile of Fluorescence Intensity': f"{tmp_df['total_counts'].quantile(0.9):.2f}",
                'Fluorescence Intensity	Mean IgG Control Intensity': f"{tmp_df[igG_cols].sum(axis=1).mean():.3f}"
            })
        df = pd.DataFrame(count_qc_data)
            df.rename(
                columns={
                    'FOV': "fov_id",
                    'Mean Fluorescence Intensity (MFI)': 'mean_fluorescence_intensity',
                    'Mean unique genes per cell': 'mean_unique_genes_per_cell',
                    'Number of Non-empty cells': 'number_non_empty_cells',
                    'PCT of Non-empty cells': 'pct_non_empty_cells',
                    '10th percentile of Fluorescence Intensity': 'percentile_10_fluorescence_intensity',
                    '90th percentile of Fluorescence Intensity': 'percentile_90_fluorescence_intensity',
                    'Fluorescence Intensity	Mean IgG Control Intensity': 'fluorescence_intensity_mean_igg_control_intensity'
                    },
                inplace=True)
            df = \
                df.astype({
                    "fov_id": int,
                    "mean_fluorescence_intensity": int,
                    "mean_unique_genes_per_cell": int,
                    "number_non_empty_cells": int,
                    "pct_non_empty_cells": float,
                    "percentile_10_fluorescence_intensity": float,
                    "percentile_90_fluorescence_intensity": float,
                    "fluorescence_intensity_mean_igg_control_intensity": float}})
            count_qc_data_for_db = \
                df.todict(orient="records")
        return count_qc_data, count_qc_data_for_db
    except Exception as e:
        raise ValueError(f"Failed to get Protein QC for file {exprMat_file}, error: {e}")

In [None]:
%%capture
def generate_count_qc_table(
    slide_flat_file_dir: str,
    slide_metadata_json_file: str,
    json_output_file: str ) -> Tuple[List[dict], str]:
    assay_type, panel_name = \
        get_assay_and_panel_info_from_slide_meatadata(
            slide_metadata_json_file)
    expr_mat_file = \
        get_expr_mat_file_for_slide(slide_flat_file_dir)
    count_qc_data = list({})
    if assay_type.upper() == 'RNA':
        count_qc_data, count_qc_data_for_db = \
            get_rna_expr_counts(exprMat_file=expr_mat_file)
    elif assay_type.upper() == 'PROTEIN':
        count_qc_data, count_qc_data_for_db = \
            get_protein_expr_counts(exprMat_file=expr_mat_file)
    else:
        raise ValueError(f"New assay type {assay_type} found in COSMX metadata flat file {slide_metadata_json_file}")
    with open(json_output_file, 'w') as fp:
        json.dump(count_qc_data_for_db, fp)
    return count_qc_data, assay_type, panel_name

In [None]:
count_qc_data, assay_type, panel_name = \
    generate_count_qc_table(
        slide_flat_file_dir=SLIDE_FLAT_FILE_DIR,
        slide_metadata_json_file=SLIDE_METADATA_JSON_FILE,
        json_output_file=JSON_OUTPUT_DIR)

In [None]:
Markdown(f"""* **Assay type:** {assay_type}
* **Panel name:** {panel_name}""")

In [None]:
HTML(pd.DataFrame(count_qc_data).to_html(index=False))