# Path Foundation Linear Probe Demo
The ipynb is a demonstration of using the Path Foundation API (this API computes embeddings for histopathology images).
The contents include how to:
Train a linear model using the embeddings
Note: This notebook is for API demonstration purposes only.
As with machine-learning use-cases it is critical to consider evaluation datasets that reflect the expected distribution of the intended use case.

#Prerequisites
You must have access to the Pathology Foundation Tool. See the project's [README](https://github.com/Google-Health/imaging-research/blob/master/path-foundation/README.md) for details.




# Train a Linear Classifier from Pregenerated Embeddings

In [None]:
# Constants


PROJECT_ID = (  # Project that contains the stored embeddings
    'hai-cd3-foundations'
)
BUCKET_NAME = (  # Bucket that contains the embeddings
    'hai-cd3-foundations-pathology-vault-entry'
)
DATASET_PROJECT_ID = 'hai-cd3-foundations'  # @param {type: 'string'}
DATASET_LOCATION = 'us-west1'  # @param {type: 'string'}
DATASET_ID = 'pathology' # @param {type: 'string'}
STORE_ID = 'camelyon'  # @param {type: 'string'}
EMBEDDING_MASK_DIR_NAME = 'embeddings/'  # @param {type: 'string'}
EVAL_CANCER_FILE = 'eval_cancer_embeddings.json'  # @param {type: 'string'}
EVAL_NON_CANCER_FILE = 'eval_non_cancer_embeddings.json'  # @param {type: 'string'}
TRAINING_CANCER_FILE = 'training_cancer_embeddings.json'  # @param {type: 'string'}
TRAINING_NON_CANCER_FILE = 'training_non_cancer_embeddings.json'  # @param {type: 'string'}
TRAINING_SET_PATCH_COUNT = 250  # @param {type: 'integer'}
EVAL_SET_PATCH_COUNT = 50  # @param {type: 'integer'}

In [None]:
from google.colab import auth
# Authenticate user for access. There will be a popup asking you to sign in with your user account and approve access.
auth.authenticate_user()

In [None]:
!pip install scikit-plot
!pip install ez-wsi-dicomweb
!pip install hcls-imaging-ml-toolkit-ez-wsi


In [None]:
### Imports

from dataclasses import dataclass
from enum import Enum
import json
from typing import Any
from ez_wsi_dicomweb import dicom_slide
from ez_wsi_dicomweb import dicom_web_interface
import ez_wsi_dicomweb
import ez_wsi_dicomweb.dicomweb_credential_factory as dicomweb_credential_factory
import ez_wsi_dicomweb.pixel_spacing as pixel_spacing
import hcls_imaging_ml_toolkit.dicom_path as dicom_path
import hcls_imaging_ml_toolkit.tags as tags
import google.cloud.storage
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scikitplot as skplt
import sklearn.linear_model
import sklearn.metrics
import sklearn.model_selection
import sklearn.pipeline
import sklearn.preprocessing
from google.cloud import storage

In [None]:
# Defines a helper Dataclass for converting Embeddings from JSON


@dataclass
class Embedding:
  # Assumes patch size is always 224 (so width and height of a patch are 224 pixels)
  dicom_study_uid: str
  dicom_series_uid: str
  x_origin: int  # Top left
  y_origin: int  # Top left
  instance_uids: list[str]
  embedding: list[float]

  def to_json(self) -> str:
    return json.dumps({
        'dicom_study_uid': self.dicom_study_uid.strip(),
        'dicom_series_uid': self.dicom_series_uid.strip(),
        'x_origin': self.x_origin,
        'y_origin': self.y_origin,
        'instance_uids': self.instance_uids,
        'embedding': json.dumps(self.embedding),
    })


@dataclass
class Embeddings:
  embeddings: list[Embedding]

  def to_json(self) -> str:
    return json.dumps([em.construct_json_object() for em in self.embeddings])

  def concat(self,other):
    return Embeddings(embeddings=self.embeddings+other.embeddings)


def embedding_from_json(json_str: str) -> Embedding:
  json_object = json.loads(json_str)
  return Embedding(
      dicom_study_uid=json_object['dicom_study_uid'],
      dicom_series_uid=json_object['dicom_series_uid'],
      x_origin=json_object['x_origin'],
      y_origin=json_object['y_origin'],
      instance_uids=json_object['instance_uids'],
      embedding=json.loads(json_object['embedding']),  # Deserialize embedding
  )


def embeddings_from_json(json_str: str) -> Embeddings:
  json_objects = json.loads(json_str)
  return Embeddings(
      embeddings=[embedding_from_json(obj) for obj in json_objects]
  )

In [None]:
# Downloads the pre-generated Embeddings for the Training and Eval sets
client = storage.Client(project=PROJECT_ID)
bucket = client.bucket(BUCKET_NAME)


def download_and_convert_embeddings(blob_path: str):
  """Downloads a blob and converts JSON to dataclass"""
  json_data = (
      client.bucket(BUCKET_NAME).get_blob(blob_path).download_as_string()
  )
  return embeddings_from_json(json_data)


# Downloads embeddings
eval_cancer_embeddings = download_and_convert_embeddings(
    EMBEDDING_MASK_DIR_NAME + EVAL_CANCER_FILE
)
eval_non_cancer_embeddings = download_and_convert_embeddings(
    EMBEDDING_MASK_DIR_NAME + EVAL_NON_CANCER_FILE
)
training_cancer_embeddings = download_and_convert_embeddings(
    EMBEDDING_MASK_DIR_NAME + TRAINING_CANCER_FILE
)
training_non_cancer_embeddings = download_and_convert_embeddings(
    EMBEDDING_MASK_DIR_NAME + TRAINING_NON_CANCER_FILE
)
eval_embeddings_obj = eval_cancer_embeddings.concat(eval_non_cancer_embeddings)

In [None]:
PATCH_SIZE = 224
TARGET_PIXEL_SPACING = pixel_spacing.PixelSpacing.FromMagnificationString('40X')
dcf = dicomweb_credential_factory.CredentialFactory()
dwi = dicom_web_interface.DicomWebInterface(dcf)


def render_patch_from_embedding(
    embedding: Embedding, plot_name: str = ''
):

  series_path = dicom_path.Path(
      project_id=DATASET_PROJECT_ID,
      location=DATASET_LOCATION,
      dataset_id=DATASET_ID,
      store_id=STORE_ID,
      study_uid=embedding.dicom_study_uid,
      series_uid=embedding.dicom_series_uid,
  )
  ds = dicom_slide.DicomSlide(
      dwi=dwi, path=series_path, enable_client_slide_frame_decompression=True
  )
  patch_bytes = ds.get_patch(
      pixel_spacing=TARGET_PIXEL_SPACING,
      x=embedding.x_origin,
      y=embedding.y_origin,
      width=PATCH_SIZE,
      height=PATCH_SIZE,
  ).image_bytes()
  plt.imshow(patch_bytes)
  plt.title(plot_name)
  plt.axis('off')
  plt.show()


render_patch_from_embedding(eval_cancer_embeddings.embeddings[0], 'Eval Cancer')

In [None]:
# Pass the Embeddings for the Training and Eval sets into scikit-learn


def concatenate_embeddings(embeddings_obj: Embeddings) -> np.array:
  """Concatenates embeddings into a NumPy array."""
  return np.array(
      [embedding.embedding for embedding in embeddings_obj.embeddings]
  )


def concatenate_series_ids(embeddings_obj: Embeddings) -> np.array:
  """Concatenates instance UIDs into a NumPy array."""
  # Assume there is one instance uid per series.
  return np.array(
      [embedding.instance_uids[0] for embedding in embeddings_obj.embeddings]
  )


# Create NumPy arrays directly (more efficient)
training_embeddings = np.concatenate([
    concatenate_embeddings(training_cancer_embeddings),
    concatenate_embeddings(training_non_cancer_embeddings),
])
eval_embeddings = np.concatenate([
    concatenate_embeddings(eval_cancer_embeddings),
    concatenate_embeddings(eval_non_cancer_embeddings),
])

training_ids = np.concatenate([
    concatenate_series_ids(training_cancer_embeddings),
    concatenate_series_ids(training_non_cancer_embeddings),
])
eval_ids = np.concatenate([
    concatenate_series_ids(eval_cancer_embeddings),
    concatenate_series_ids(eval_non_cancer_embeddings),
])

# Create labels (already done in the previous question)
training_labels = np.concatenate(
    (np.ones(TRAINING_SET_PATCH_COUNT), np.zeros(TRAINING_SET_PATCH_COUNT))
)
eval_labels = np.concatenate(
    (np.ones(EVAL_SET_PATCH_COUNT), np.zeros(EVAL_SET_PATCH_COUNT))
)

In [None]:
# Train a linear classifier
clf_pipeline = sklearn.pipeline.Pipeline([
    ('scaler', sklearn.preprocessing.StandardScaler()),
    (
        'logreg',
        sklearn.model_selection.GridSearchCV(
            sklearn.linear_model.LogisticRegression(
                random_state=0,
                multi_class='ovr',
                verbose=False,
            ),
            cv=sklearn.model_selection.StratifiedGroupKFold(n_splits=5).split(
                training_embeddings, y=training_labels, groups=training_ids
            ),
            param_grid={'C': np.logspace(start=-4, stop=4, num=10, base=10)},
            scoring='roc_auc_ovr',
            refit=True,
        ),
    ),
]).fit(training_embeddings, training_labels)


test_predictions = clf_pipeline.predict_proba(eval_embeddings)

In [None]:
# Evaluate the linear classifiers performance using the eval set

sklearn.metrics.roc_auc_score(eval_labels, test_predictions[:, 1])

In [None]:
# @title Plot the ROC Curve

display = sklearn.metrics.RocCurveDisplay.from_predictions(eval_labels, test_predictions[:,1],name="Tumor Classifier")
display.ax_.set_title("ROC of Tumor Classifier")

In [None]:
# @title Find a threshold that optimises for the sum of sens and spec

thresholds = np.linspace(0, 1, 100)
sensitivities = []
specificities = []
for threshold in thresholds:
    predictions = (test_predictions[:, 1] > threshold)
    sensitivities.append(sklearn.metrics.recall_score(eval_labels, predictions))
    specificities.append(sklearn.metrics.recall_score(eval_labels == 0, predictions == 0))
index = np.argmax(np.array(sensitivities) + np.array(specificities))
best_threshold = thresholds[index]
sens = sensitivities[index]
spec = specificities[index]
print(f"Best threshold: {round(best_threshold,2)}. Sensitivity is {round(sens*100,2)}% and Specificity is {round(spec*100,2)}% ")

In [None]:
# @title Show the results in a table

df = pd.DataFrame({'ground_truth':eval_labels,'model_score':test_predictions[:,1]})
df['tumor_prediction'] = df['model_score'] > best_threshold
df['embeddings'] = [embedding for embedding in eval_embeddings_obj.embeddings]

df

In [None]:
# @title Visualize True Positives

df_tp = df[(df['tumor_prediction'] == True) & (df['ground_truth'] == 1)].sort_values('model_score',ascending=False).head(5)
for _, row in df_tp.iterrows():
  print(f'model score is {row.model_score}')
  render_patch_from_embedding(row.embeddings, f'True Positive')

In [None]:
# @title Visualize True Negatives

df_tn = df[(df['tumor_prediction'] == False) & (df['ground_truth'] == 0)].sort_values('model_score',ascending=False).head(5)
for _, row in df_tn.iterrows():
  print(f'model score is {row.model_score}')
  render_patch_from_embedding(row.embeddings, f'True Negative')

In [None]:
# @title Visualize False Positives

df_fp = df[(df['tumor_prediction'] == True) & (df['ground_truth'] == 0)].sort_values('model_score',ascending=False)
for _, row in df_fp.iterrows():
  print(f'model score is {row.model_score}')
  render_patch_from_embedding(row.embeddings, f'False Positive')

In [None]:
# @title Visualize False Negatives

df_fn = df[(df['tumor_prediction'] == False) & (df['ground_truth'] == 1)].sort_values('model_score',ascending=True)
for _, row in df_fn.iterrows():
  print(f'model score is {row.model_score}')
  render_patch_from_embedding(row.embeddings, f'False Negative')