# Dicom parsing & compute goes wrong

- This pipeline shows you how multiple things can go wrong and make a pipeline fail using the DICOM format

- We are also using Gemini to give us an initial diagnose that in real life would need to be validated by a doctor

The NIH Chest X-ray dataset consists of 100,000 de-identified images of chest x-rays in PNG format, provided by NIH Clinical Center

We are using the following datasource: gs://srtt-healthcare-nih-chest-xray

### Instructions
1. Update the project_id, gcs_data_path, temp_location, region
2. We need a lot of files from this bucket gs://gcs-public-data--healthcare-nih-chest-xray/dicom/*.dcm copied into a local bucket of yours: gcs_data_path (use the cell for that)
3. Authenticate with your user so that you can submit your job directly from the notebook (for Colab)

### What to do
- Try to use a few dicon files (changing the beam.create statement)
- Check for things that don’t look right
- Correct the missing code to have gemini do adiagnose over the image or image + meta data
  







In [1]:
!pip install apache_beam[gcp]>=2.50
!pip install pydicom
!pip install pillow
!pip install numpy
!pip install --upgrade google-cloud-aiplatform

Collecting pydicom
  Downloading pydicom-3.0.0-py3-none-any.whl.metadata (9.4 kB)
Downloading pydicom-3.0.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m16.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pydicom
Successfully installed pydicom-3.0.0
Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.67.1-py2.py3-none-any.whl.metadata (32 kB)
Downloading google_cloud_aiplatform-1.67.1-py2.py3-none-any.whl (5.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.2/5.2 MB[0m [31m46.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-cloud-aiplatform
  Attempting uninstall: google-cloud-aiplatform
    Found existing installation: google-cloud-aiplatform 1.66.0
    Uninstalling google-cloud-aiplatform-1.66.0:
      Successfully uninstalled google-cloud-aiplatform-1.66.0
Successfully installed google-cloud-aiplatform-1.67.1


In [1]:
#@title Colab notebook variables { run: "auto", display-mode: "form" }
PROJECT_ID = "sfsc-srtt-shared" #@param {type:"string"}
GCS_DATA_PATH = "gs://srtt-healthcare-nih-chest-xray" #@param {type:"string"}
TEMP_LOCATION = "gs://sfsc-df/temp/" #@param {type:"string"}
REGION = "us-central1" #@param {type:"string"}

In [2]:
from google.colab import auth
auth.authenticate_user()

In [7]:
!gcloud config set $PROJECT_ID
#copy necessary files
!gsutil -m -u $PROJECT_ID cp gs://gcs-public-data--healthcare-nih-chest-xray/dicom/*.dcm $GCS_DATA_PATH

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_002.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_003.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_004.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_005.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_006.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_007.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000165_008.dcm [Content-Type=application/octet-stream]...
Copying gs://gcs-public-data--healthcare-nih-chest-xray/dicom/00000166_000.dcm [C

In [8]:
import pydicom
from PIL import Image
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
import numpy as np
import io

import base64
import vertexai
from vertexai.generative_models import GenerativeModel, Part, SafetySetting

#Extract DICOM data
class ReadDicom(beam.DoFn):
  def __init__(self):
      print('init')
  #def start_bundle(self):
  #     print('start_bundle')
  #def finish_bundle(self):
  #     print('finish_bundle'
  def process(self, path):
      ds = pydicom.dcmread(path)
      metadata ={"patientId":ds.PatientID, "name":str(ds.PatientName)[0:5], "Patient​Sex":ds.PatientSex, "studyId":ds.StudyId, "studyDate":ds.studyDate}
      imgdata = {"patientId":ds.PatientID, "studyId":ds.StudyId, "Patient​Sex":ds.PatientSex, "img2d": ds.pixel_array.astype(float)}

      yield beam.pvalue.TaggedOutput('meta',metadata)
      yield beam.pvalue.TaggedOutput('img', imgdata )

# convert to JPEG
class ConvertImg(beam.DoFn):
  def process(self, rec):
    image_2d_scaled = (np.maximum(rec['img2d'],0) / rec['img2d'].max()) *255.0

    image_2d_scaled = np.uint8(image_2d_scaled)
    img = Image.fromarray(image_2d_scaled)
    with io.BytesIO() as buf:
      return img.save(buf, 'jpeg')


#call gemini
class CallVertexAIGeminiModel(beam.DoFn):
  m = None
  def setup(self):
     vertexai.init(project=PROJECT_ID, location=REGION)
     print("all setup")

  def process(self, filecontent):
    m = GenerativeModel(
        "gemini-1.5-flash-001",
    )

    prompt = """
    for study purpose, you are showing student how to diagnose illness by looking at chest x-rays, based on the following image could you give us a diagnose ?
    """

    img_file =  filecontent[1].read(mime_type="img/jpeg")
    contents = [Part.from_data(img_file, mime_type=="img/jpeg"), prompt]
    response = m.generate_content(contents)

    yield response.text

####nain pipeline
beam_options = PipelineOptions(
    runner='DataflowRunner',
    max_num_workers=10,
    project=PROJECT_ID,
    job_name='apple-workshop2',
    temp_location=TEMP_LOCATION,
    region=REGION
    )

with beam.Pipeline(options=beam_options) as p:
   out1 = (p | "search files" >>  beam.Create([GCS_DATA_PATH + "/*"])
          |  fileio.MatchAll()
          |  fileio.ReadMatches()
          |  "read Dicom" >> beam.ParDo(ReadDicom()).with_outputs()
   )

   #process img
   img_processed =( out1.img
                   | 'set keys img' >> beam.util.WithKeys(lambda x: x.metadata.path)
                   | 'convert to jpeg' >> beam.ParDo(ConvertImg())
   )

   #process metadata
   meta_processed =( out1.meta
                   | 'set keys meta' >> beam.util.WithKeys(lambda x: x['Patient​Sex'])
   )


   #Join the pictures & meta data to get the diagnostic
   joined_bc_bd_pv = ({'img':img_processed, 'meta': meta_processed}
                        |'Merge img & meta' >> beam.CoGroupByKey()
                        | 'display' >> beam.ParDo(print)
                      #  | 'call Gemini' >> beam.ParDo(CallVertexAIGeminiModel())

   )

#https://www.oreilly.com/online-learning/sep2024-reveal-your-deal.html?code=FALLSAVINGS24&utm_medium=email

ERROR:pydicom.pixels.utils:No module named 'pylibjpeg'
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pydicom/pixels/utils.py", line 1274, in _passes_version_check
    module = importlib.import_module(package_name, "__version__")
  File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'pylibjpeg'
ERROR:pydicom.pixels.utils:No module named 'pylibjpeg'
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pydicom/pixels/utils.py", line 1274, in _passes_version_check
    module = importlib.import_module(package_name, "__version__")
  File "/usr/lib/python3.10/importlib/__init__.py", line 126, i



init


ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-09-19T10:01:15.437Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1671, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "<ipython-input-8-60d2e0d58552>", line 22, in process
NameError: name 'pydicom' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
    response = task()
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1671, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "<ipython-input-8-60d2e0d58552>", line 22, in process
NameError: name 'pydicom' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
    response = task()
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 567, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1586, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1681, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1794, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1586, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1681, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1794, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1586, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1681, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1794, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1586, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1681, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1794, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 687, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1671, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "<ipython-input-8-60d2e0d58552>", line 22, in process
NameError: name 'pydicom' is not defined [while running '[8]: read Dicom/ParDo(ReadDicom)-ptransform-89']
