In [None]:
%pip install --quiet apache-beam[gcp,dataframe]

In [None]:
%pip install --quiet fsspec[gcs]

In [1]:
# These imports will fail unless you restart the runtime after pip install.
import os
import apache_beam as beam
import apache_beam.dataframe.convert
import apache_beam.dataframe.io
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from PIL import Image as PilImage
import pandas as pd
import numpy as np
import fsspec
import tensorflow as tf
import pyarrow

In [2]:
# Length of the embedding vector for one image. Schema needs to know in advance.
EMB_LEN = 2048

class Embedder(beam.DoFn):

  def setup(self):
    import tensorflow as tf
    # Input should be grayscale (not RGB) images. They should be uint16.
    x = tf.keras.layers.Input([None, None], dtype=tf.uint16)
    x = tf.image.convert_image_dtype(x, 'uint8')
    x = tf.cast(x, tf.float32)
    x = x[..., tf.newaxis]  # Add channels dimension.
    x = tf.image.resize(x, [224, 224])
    x = tf.image.grayscale_to_rgb(x)
    x = tf.keras.applications.resnet50.preprocess_input(x)
    self._model = tf.keras.applications.resnet50.ResNet50(
            input_tensor=x,
            input_shape=(224, 224, 3),
            include_top=False,
            weights="imagenet",
            pooling='avg')

  def process(self, elem):
    import numpy as np
    if self._model is None:
      self.create_model()
    img = elem['image']
    if len(img.shape) != 2:
      raise ValueError('Grayscale image expected but shape was %s. img_path=%s' %
                       (str(img.shape), elem['image_path']))
    if img.dtype != np.uint16:
      raise ValueError('Model expects uint16 image but was %s. img_path=%s' % 
                       (img.dtype, elem['image_path']))
    img = np.expand_dims(img, axis=0) # Make batch dimension
    # TODO: Consider making a batching DoFn for better efficiency.
    emb = self._model.predict(img)
    elem['embedding'] = np.squeeze(emb) # Remove batch dimension
    return [elem]


def load_img(elem):
  import fsspec
  from PIL import Image as PilImage
  import numpy as np
  path = elem['image_path']
  with fsspec.open(path, mode='rb') as f:
    im = PilImage.open(f)
    array = np.asarray(im)
  elem['image'] = array
  return elem


def to_row(elem, metadata_fields, emb_fields):
  import apache_beam as beam
  # Needs to match the schema in with_output_types
  d = {k: elem[k] for k in metadata_fields}
  if len(emb_fields) != len(elem['embedding']):
    raise ValueError('Expected embedding length %d but it is %d.' %
                     (len(emb_fields), len(elem['embedding'])))
  for emb_name, emb_val in zip(emb_fields, elem['embedding']):
    d[emb_name] = emb_val
  return d


def make_pipeline(image_metadata_path,
                  output_path,
                  num_output_shards=None,
                  options=None):
  # Read the header of the image_metadata csv file to know the columns.
  with fsspec.open(image_metadata_path, mode='rb') as f:
    header = list(pd.read_csv(f, nrows=1).columns)
  if 'image_path' not in header:
    raise ValueError('Missing column image_path')

  # Beam needs to know the schema in advance. Can't be automatically determined
  # since the length of the embedding is a variable. 
  metadata_fields = list(header)
  emb_fields = ['emb_%04d' % i for i in range(EMB_LEN)]
  schema = pyarrow.schema([(x, pyarrow.string()) for x in metadata_fields] +
                          [(x, pyarrow.float32()) for x in emb_fields])
  
  p = beam.Pipeline(options=options)
  beam_df = p | beam.dataframe.io.read_csv(
      image_metadata_path, dtype=str)
  p_metadata = (beam.dataframe.convert.to_pcollection(beam_df) |
                'row_to_dict' >> beam.Map(lambda x: dict(x._asdict())) |
                'Reshuffle1' >> beam.Reshuffle())
  p_image = p_metadata | beam.Map(load_img)
  p_emb = p_image | 'compute_embeddings' >> beam.ParDo(Embedder())
  p_rows = p_emb | beam.Map(to_row, metadata_fields, emb_fields)
  p_rows |= 'Resuffle2' >> beam.Reshuffle()
  _ = p_rows | beam.io.parquetio.WriteToParquet(output_path,
                                                schema,
                                                num_shards=num_output_shards)

  return p

def run_pipeline(image_metadata_path,
                 output_path,
                 num_output_shards=None,
                 options=None):
  pipeline = make_pipeline(image_metadata_path,
                           output_path,
                           num_output_shards=num_output_shards,
                           options=options)
  pipeline_result = pipeline.run()
  if hasattr(pipeline_result, '_job'):
    # It was launched on cloud dataflow. Print the URL for the job.
    url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
          (pipeline_result._job.location,
          pipeline_result._job.id,
          pipeline_result._job.projectId))
    print(url)
  return pipeline_result

**Do a local test run**

The pipeline expects one grayscale image per stain. The paths to these images and any associated labels are given in a csv file.

Create an example csv file and example images to do a test run of the pipeline.

In [3]:
%%writefile example_image_metadata.csv
image_path,batch,plate,well,site,stain
im1.tiff,batch_01,plate_02,A05,42,DAPI
im2.tiff,batch_01,plate_02,A05,42,RNA

Writing example_image_metadata.csv


In [4]:
example_image_metadata_df = pd.read_csv('example_image_metadata.csv')

np.random.seed(12345) # For determinism.

def write_test_image(image_path):
  array = np.random.randint(0, 65535, size=(10, 20), dtype='uint16')
  pil_image = PilImage.fromarray(array)
  with open(image_path, 'wb') as f:
    pil_image.save(f, 'tiff')
  print('Wrote to %s' % image_path)

_ = example_image_metadata_df['image_path'].map(write_test_image)

Wrote to im1.tiff
Wrote to im2.tiff


In [None]:
run_pipeline('example_image_metadata.csv', 'example_outfile')

pd.read_parquet('example_outfile-00000-of-00001')



Unnamed: 0,image_path,batch,plate,well,site,stain,emb_0000,emb_0001,emb_0002,emb_0003,...,emb_2038,emb_2039,emb_2040,emb_2041,emb_2042,emb_2043,emb_2044,emb_2045,emb_2046,emb_2047
0,im1.tiff,batch_01,plate_02,A05,42,DAPI,0.068733,0.345606,0.0,0.0,...,0.710415,0.141535,0.0,0.001137,0.0,0.0,0.0,0.151024,0.0,0.029063
1,im2.tiff,batch_01,plate_02,A05,42,RNA,0.044517,1.047723,0.0,0.0,...,0.884657,0.460547,0.065838,0.005723,0.0,0.0,0.0,0.0,0.0,0.0


**Launch on Cloud Dataflow**

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
GCS_PROJECT = 'YOUR_PROJECT' # @param { type: "string", isTemplate: true}
GCS_BUCKET = 'YOUR_BUCKET' # @param { type: "string", isTemplate: true}
GCS_REGION = 'YOUR_REGION' # @param { type: "string", isTemplate: true}
GCS_ROOT_DIR = 'gs://YOUR_BUCKET/test_embedding/' # @param { type: "string", isTemplate: true}

In [None]:
# Copy example input data to cloud.
gcs_example_image_metadata_df = example_image_metadata_df.copy()
gcs_example_image_metadata_df['image_path'] = (GCS_ROOT_DIR + gcs_example_image_metadata_df['image_path'])
gcs_example_image_metadata_df.to_csv('gcs_example_image_metadata.csv', index=False)
gcs_image_metadata_path = os.path.join(GCS_ROOT_DIR, 'example_image_metadata.csv')

!gsutil cp gcs_example_image_metadata.csv {gcs_image_metadata_path}
!gsutil cp im1.tiff {GCS_ROOT_DIR}im1.tiff
!gsutil cp im2.tiff {GCS_ROOT_DIR}im2.tiff

In [None]:
%%writefile requirements.txt
apache-beam[gcp,dataframe]>=2.38.0
fsspec[gcs]>=2022.3.0
numpy>=1.21.6
pandas>=1.3.5
Pillow>=9.1.0
tensorflow

In [None]:
def get_pipeline_options(project, bucket, region):
  """Returns cloud dataflow pipeline options."""
  options = pipeline_options.PipelineOptions(flags=[
      '--requirements_file',
      'requirements.txt',
      '--runner',
      'DataflowRunner',
      # Flag use_runner_v2 avoids a segfault when worker pool starts.
      # Probably not needed long term.
      '--experiments',
      'use_runner_v2'
  ])
  options.view_as(pipeline_options.GoogleCloudOptions).project = project
  options.view_as(pipeline_options.GoogleCloudOptions).region = region
  dataflow_gcs_location = 'gs://%s/dataflow' % bucket
  options.view_as(pipeline_options.GoogleCloudOptions
                 ).staging_location = '%s/staging' % dataflow_gcs_location
  options.view_as(pipeline_options.GoogleCloudOptions
                 ).temp_location = '%s/temp' % dataflow_gcs_location
  return options

options = get_pipeline_options(GCS_PROJECT, GCS_BUCKET, GCS_REGION)

In [None]:
gcs_output_path = os.path.join(GCS_ROOT_DIR, 'output')
num_output_shards = 1 # Increase for larger datasets.

result = run_pipeline(gcs_image_metadata_path,
                      gcs_output_path,
                      num_output_shards=num_output_shards,
                      options=options)

In [None]:
# Open the result when the pipeline is done.
first_shard_path = gcs_output_path + ('-00000-of-%05d' % num_output_shards)
with fsspec.open(first_shard_path) as f:
  df = pd.read_parquet(f)
df