<a href="https://colab.research.google.com/github/mauro-nievoff/MultiCaRe_Dataset/blob/main/3_Turning_Captions_into_Image_Labels.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Turning Image Captions into Structured Data

The text found in image captions can be used to create labels for their corresponding images. In order to do this, three steps were followed:

1. Caption Pre-Processing
2. Data Extraction using Spark NLP
3. Data Normalization

## 1. Caption Pre-Processing

The main purpose of this step is to split captions that contain references to different images, and then assign each part of the caption to the correct referenced image. Let's take a look at the sample caption below:

In [None]:
sample_caption = '''Brain CT scan. There is a mass in the frontal lobe (A-C) and an intracerebral hemorrhage in the right parietotemporal lobe (C and D).'''

This caption has three parts:
- `Brain CT scan.`: Initial statement without explicit references. This part of the caption refers to all the parts of the image (A to D).
- `There is a mass in the frontal lobe`: A statement with a range reference (A-C). It refers to the image parts A, B and C.
- `and an intracerebral hemorrhage in the right parietotemporal lobe`: This statement refers to the image parts C and D.

### Secondary Functions

The `classify_chunks()` function is used to split a given text into smaller pieces (chunks), and then classify those chunks as 'reference' (e.g. A or C), 'split' (e.g. special characters as commas or dots) and 'other' (any other chunk).

In [None]:
import re

In [None]:
def classify_chunks(text):

  split_text = re.split(r'([;:./(/),]|-| and | to )', text)
  reference_tokens = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L']

  chunk_dicts = []
  for chunk in split_text:
    if chunk.strip() in reference_tokens:
      chunk_dicts.append({'chunk': chunk, 'token_type': 'reference'})
    elif chunk.strip() in [';', ':', '(', ')', ',', '.', 'and', 'to', '-']:
      chunk_dicts.append({'chunk': chunk, 'token_type': 'split'})
    else:
      chunk_dicts.append({'chunk': chunk, 'token_type': 'other'})
  return chunk_dicts

In [None]:
chunk_dicts = classify_chunks(sample_caption)
chunk_dicts[:10]

[{'chunk': 'Brain CT scan', 'token_type': 'other'},
 {'chunk': '.', 'token_type': 'split'},
 {'chunk': ' There is a mass in the frontal lobe ', 'token_type': 'other'},
 {'chunk': '(', 'token_type': 'split'},
 {'chunk': 'A', 'token_type': 'reference'},
 {'chunk': '-', 'token_type': 'split'},
 {'chunk': 'C', 'token_type': 'reference'},
 {'chunk': ')', 'token_type': 'split'},
 {'chunk': '', 'token_type': 'other'},
 {'chunk': ' and ', 'token_type': 'split'}]

Those chunks are then concatenated depending on their types using `concat_chunks()`. As a result, the original text is split into strings that are classified as 'caption' (image description) or 'reference' (e.g. '(A-C)').

In [None]:
def concat_chunks(chunk_dicts):

  caption_sections = []
  section_string = ''
  reference_string = ''

  for i, chunk in enumerate(chunk_dicts):
    if chunk['token_type'] == 'other':
      if reference_string != '':
        caption_sections.append({'string': reference_string, 'type': 'reference'})
        reference_string = ''
      section_string += chunk['chunk']
    elif chunk['token_type'] == 'split':
      if reference_string != '':
        reference_string += chunk['chunk']
      else:
        section_string += chunk['chunk']
    elif chunk['token_type'] == 'reference':
      if section_string != '':
        caption_sections.append({'string': section_string, 'type': 'caption'})
        section_string = ''
      reference_string += chunk['chunk']

  if reference_string:
    caption_sections.append({'string': reference_string, 'type': 'reference'})
  if section_string:
    caption_sections.append({'string': section_string, 'type': 'caption'})

  return caption_sections

In [None]:
caption_sections = concat_chunks(chunk_dicts)
caption_sections

[{'string': 'Brain CT scan. There is a mass in the frontal lobe (',
  'type': 'caption'},
 {'string': 'A-C)', 'type': 'reference'},
 {'string': ' and an intracerebral hemorrhage in the right parietotemporal lobe (',
  'type': 'caption'},
 {'string': 'C and D)', 'type': 'reference'},
 {'string': '.', 'type': 'caption'}]

If any reference range is present in the text (e.g. 'B-E' or 'B to E'), `expand_ranges()` will create a `tidy_refs` key including a list with all the references included in the range (e.g. B, C, D, E).

In [None]:
def expand_ranges(caption_sections):
  # This part of the code is used to turn range references (such as 'a-d') to list references (such as 'a, b, c, d').
  pattern_1 = r'(,|;| and )'
  pattern_2 = r'(-| to )'
  list_of_letters = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L']

  for dct in caption_sections:
    if dct['type'] == 'reference':
      dct['tidy_refs'] = []
      refs = re.split(pattern_1, dct['string'])
      for element in refs:
        if element != ' and ':
          consecutive_refs = re.split(pattern_2, element)
          if len(consecutive_refs) == 1:
            dct['tidy_refs'].append(consecutive_refs[0].strip())
          if len(consecutive_refs) > 1:
            range_start = re.sub(r'[^A-Z]', '', consecutive_refs[0].strip())
            range_end = re.sub(r'[^A-Z]', '', consecutive_refs[-1].strip())

            if (range_start in list_of_letters) and (range_end in list_of_letters):
              dct['tidy_refs'].append(range_start)
              reduced_list_of_letters = list_of_letters[list_of_letters.index(range_start)+1:list_of_letters.index(range_end)]
              for letter in reduced_list_of_letters:
                dct['tidy_refs'].append(letter)
              dct['tidy_refs'].append(range_end)
  return caption_sections

In [None]:
caption_sections = expand_ranges(caption_sections)
caption_sections

[{'string': 'Brain CT scan. There is a mass in the frontal lobe (',
  'type': 'caption'},
 {'string': 'A-C)', 'type': 'reference', 'tidy_refs': ['A', 'B', 'C']},
 {'string': ' and an intracerebral hemorrhage in the right parietotemporal lobe (',
  'type': 'caption'},
 {'string': 'C and D)', 'type': 'reference', 'tidy_refs': ['C', 'D)']},
 {'string': '.', 'type': 'caption'}]

### Main Pre-Processing Function

The `preprocess_caption()` function uses the secondary functions to turn a caption into a dataframe with references and their corresponding caption.

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [None]:
def preprocess_caption(caption_paragraph):

  organized_captions = []
  last_ref = ['common_string']
  for sentence in caption_paragraph.split('.'):
    chunk_dicts = classify_chunks(sentence)
    caption_sections = concat_chunks(chunk_dicts)
    caption_sections = expand_ranges(caption_sections)
    refs = [ref for ref in caption_sections if ref['type'] == 'reference']
    caps = [ref for ref in caption_sections if ref['type'] == 'caption']

    ### The way in which captions are assigned to references depends on the amount of reference and caption strings, and on their order.
    if (len(refs) == 0):
      organized_captions.append({'sentence': sentence, 'reference':last_ref})
    elif (len(refs) == 1):
      last_ref = refs[-1]['tidy_refs']
      organized_captions.append({'sentence': sentence, 'reference':last_ref})
    else:
      if len(refs) == len(caps):
        for i, r in enumerate(refs):
          last_ref = refs[i]['tidy_refs']
          organized_captions.append({'sentence': caps[i]['string'], 'reference': r['tidy_refs']})
      else:
        for i, c in enumerate(caps):
          if i != len(caps)-1:
            split_c = re.split(r'(,|;| and )', c['string'])
            last_ref = refs[i]['tidy_refs']
            if (len(split_c) == 1) or (i==0):
              organized_captions.append({'sentence': c['string'], 'reference': refs[i]['tidy_refs']})
            else:
              organized_captions.append({'sentence': ','.join(split_c[:-1]), 'reference': refs[i-1]['tidy_refs']})
              organized_captions.append({'sentence': split_c[-1], 'reference': refs[i]['tidy_refs']})
          else:
            organized_captions.append({'sentence': c['string'], 'reference': last_ref})

  # A list of all the present references is created.
  references = []
  for c in organized_captions:
    for r in c['reference']:
      if (r != 'common_string') and (r not in references):
        references.append(r)

  # Mapping references to captions
  mapping_dicts = []
  if references:
    for ref in references:
      r = re.sub(r'[^A-Z]', '', ref) # Special characters are removed from references.
      if r:
        reference_caption = '.'.join([c['sentence'] for c in organized_captions if ((c['reference'] == ['common_string']) or (ref in c['reference']))]) # Strings from the same reference are joined.
        mapping_dicts.append({'reference': r, 'caption': reference_caption})
  else:
    mapping_dicts.append({'reference': 'undivided_caption', 'caption': '. '.join([c['sentence'] for c in organized_captions])}) # In case no split is necessary for a specific caption.

  caption_df = pd.DataFrame(mapping_dicts)
  return caption_df

In [None]:
caption_df = preprocess_caption(sample_caption)
caption_df

Unnamed: 0,reference,caption
0,A,Brain CT scan. There is a mass in the frontal lobe (
1,B,Brain CT scan. There is a mass in the frontal lobe (
2,C,Brain CT scan. There is a mass in the frontal lobe (. and an intracerebral hemorrhage in the right parietotemporal lobe (.
3,D,Brain CT scan. and an intracerebral hemorrhage in the right parietotemporal lobe (.


The caption was split and each part was correctly assigned to a reference. In the process, some special characters such as `(` may remain, but this will not affect the general outcome at all.

## 2. Data Extraction using Spark NLP

The extraction of relevant data from captions is done using [contextual parsers](https://www.johnsnowlabs.com/contextual-parser-increased-flexibility-extracting-entities-in-spark-nlp/) included in the library Spark NLP from [John Snow Labs](https://www.johnsnowlabs.com/). To use the contextual parsers, csv dictionaries and configuration JSON files are needed.

1. CSV Dictionaries:
  - They contain the relevant chunks (such as 'CT scan') and the corresponding labels that should be used to extract them (such as 'Imaging_Finding').
  - Chunks with different lengths (amount of tokens) are included in different dictionaries so that longer chunks can be prioritized when using the `ChunkMerger` in the NLP pipeline.
  - The full forms of the dictionaries were created by manually annotating the great majority of the n-grams present in the full corpus of captions (with different values of n).
  - In this example, two tiny dictionaries are created to extract the relevant data from the sample caption.

In [None]:
dict_1 = pd.DataFrame([
    ['Imaging_Finding', 'mass', None],
    ['Site', 'intracerebral', 'Brain'],
    ['Laterality', 'right', None]
    ])

dict_1.to_csv('dict_1.csv', index = False)

print('Dictionary of 1-token chunks:')
dict_1

Dictionary of single-token chunks:


Unnamed: 0,0,1,2
0,Imaging_Finding,mass,
1,Site,intracerebral,Brain
2,Laterality,right,


In [None]:
dict_2 = pd.DataFrame([
    ['Imaging_Test', 'CT scan', None],
    ['Imaging_Finding', 'intracerebral hemorrhage', None],
    ['Site', 'frontal lobe', 'parietotemporal lobe']
    ])

dict_2.to_csv('dict_2.csv', index = False)

print('Dictionary of 2-token chunks:')
dict_2

Dictionary of 2-token chunks:


Unnamed: 0,0,1,2
0,Imaging_Test,CT scan,
1,Imaging_Finding,intracerebral hemorrhage,
2,Site,frontal lobe,parietotemporal lobe


2. Configuration JSON Files:
  - They contain values for different parameters used by the contextual parsers.
  - In this example, there is one JSON file for single-token chunks, and another one for multiple-token chunks.

In [None]:
import json

In [None]:
### Parser jsons
json_1 = {
  "entity": "chunk",
  "ruleScope": "sentence",
  "matchScope":"token",
  "completeMatchRegex": "true"
}

with open('json_1.json', 'w') as f:
  json.dump(json_1, f)

json_2 = {
  "entity": "chunk",
  "ruleScope": "document",
  "matchScope":"sub-token",
}

with open('json_2.json', 'w') as f:
  json.dump(json_2, f)

### Pipeline Creation

First, we need to install the library and start Spark session. To use the contextual parsers, a Spark NLP for Healthcare license is needed.

In [None]:
%%capture
!pip install johnsnowlabs

In [None]:
from johnsnowlabs import nlp, medical
nlp.install()

<IPython.core.display.Javascript object>

127.0.0.1 - - [16/Oct/2023 13:55:15] "GET /login?code=OhasUB3StMVfwF4hiSc1aRKZjt0TO0 HTTP/1.1" 200 -


<IPython.core.display.Javascript object>

Downloading license...
Licenses extracted successfully
📋 Stored John Snow Labs License in /root/.johnsnowlabs/licenses/license_number_0_for_Spark-Healthcare_Spark-OCR.json
👷 Setting up  John Snow Labs home in /root/.johnsnowlabs, this might take a few minutes.
Downloading 🐍+🚀 Python Library spark_nlp-5.1.1-py2.py3-none-any.whl
Downloading 🐍+💊 Python Library spark_nlp_jsl-5.1.1-py3-none-any.whl
Downloading 🫘+🚀 Java Library spark-nlp-assembly-5.1.1.jar
Downloading 🫘+💊 Java Library spark-nlp-jsl-5.1.1.jar
🙆 JSL Home setup in /root/.johnsnowlabs
Installing /root/.johnsnowlabs/py_installs/spark_nlp_jsl-5.1.1-py3-none-any.whl to /usr/bin/python3
Installed 1 products:
💊 Spark-Healthcare==5.1.1 installed! ✅ Heal the planet with NLP! 


In [None]:
spark = nlp.start()

👌 Launched [92mcpu optimized[39m session with with: 🚀Spark-NLP==5.1.1, 💊Spark-Healthcare==5.1.1, running on ⚡ PySpark==3.1.2


Then the NLP pipeline is created, including:
- Pre-processing steps: including a `DocumentAssembler`, a `SentenceDetector` and a `RegexTokenizer`.
- Contextual Parsers: dictionaries with longer n-grams are incorporated first so that the `ChunkMerger` prioritizes them.
- `ChunkMerger`: used to combine the outcomes from different Contextual Parsers.

In [None]:
def create_nlp_pipeline(spark_session, csv_dicts, json_1, json_multiple):

  pipeline_stages = []
  cp_columns = []

  ## Pre-processing steps

  pipeline_stages.append(nlp.DocumentAssembler().setInputCol("caption").setOutputCol("document"))
  pipeline_stages.append(nlp.SentenceDetector().setInputCols(["document"]).setOutputCol("sentence"))
  pattern = "\s+|(?=[-.:;*+,&%\\(\\)\\[\\]])|(?<=[-.:;*+,&%\\(\\)\\[\\]])|(?<=[-.:;*+,&%\(\)\[\]])"
  pipeline_stages.append(nlp.RegexTokenizer().setInputCols(["sentence"]).setOutputCol("token").setPattern(pattern).setPositionalMask(False))

  ## Contextual Parsers

  for cp_dict in csv_dicts:
    cp_column = cp_dict[:-4]
    cp_columns.append(cp_column)

    if '1' in cp_column:
      json_file = json_1
    else:
      json_file = json_multiple

    cp = medical.ContextualParserApproach() \
      .setInputCols(["sentence", "token"])\
      .setOutputCol(cp_column)\
      .setJsonPath(json_file)\
      .setCaseSensitive(False)\
      .setDictionary(cp_dict, options={"delimiter":","})

    pipeline_stages.append(cp)

  ## ChunkMerger

  pipeline_stages.append(medical.ChunkMergeApproach().setInputCols(cp_columns).setOutputCol("ner_chunk"))

  cp_pipeline = nlp.Pipeline(stages=pipeline_stages)

  empty_data = spark_session.createDataFrame([[""]]).toDF("caption")

  return cp_pipeline.fit(empty_data)

The pipeline now is created and used for data extraction:

In [None]:
spark_df = spark.createDataFrame(input_df)

nlp_model = create_nlp_pipeline(spark_session = spark, csv_dicts = ['dict_2.csv', 'dict_1.csv'], json_1 = 'json_1.json', json_multiple = 'json_2.json')

results = nlp_model.transform(spark_df)

The extractions are turned into a dataframe with all the extracted chunks and their corresponding labels exploded in different rows.

In [None]:
from pyspark.sql import functions as F

In [None]:
extraction_df = results.select('reference', 'caption', F.explode(F.arrays_zip(results.ner_chunk.result, results.ner_chunk.metadata)).alias("cols")) \
                  .select('reference', 'caption', F.expr("cols['0']").alias("chunk"), F.expr("cols['1']['normalized']").alias("ner_label")).toPandas()

extraction_df

Unnamed: 0,reference,caption,chunk,ner_label
0,A,Brain CT scan. There is a mass in the frontal lobe (,brain,Site
1,A,Brain CT scan. There is a mass in the frontal lobe (,ct scan,Imaging_Test
2,A,Brain CT scan. There is a mass in the frontal lobe (,mass,Imaging_Finding
3,A,Brain CT scan. There is a mass in the frontal lobe (,frontal lobe,Site
4,B,Brain CT scan. There is a mass in the frontal lobe (,brain,Site
5,B,Brain CT scan. There is a mass in the frontal lobe (,ct scan,Imaging_Test
6,B,Brain CT scan. There is a mass in the frontal lobe (,mass,Imaging_Finding
7,B,Brain CT scan. There is a mass in the frontal lobe (,frontal lobe,Site
8,C,Brain CT scan. There is a mass in the frontal lobe (. and an intracerebral hemorrhage in the right parietotemporal lobe (.,brain,Site
9,C,Brain CT scan. There is a mass in the frontal lobe (. and an intracerebral hemorrhage in the right parietotemporal lobe (.,ct scan,Imaging_Test


## 3. Data Normalization

Once the data is extracted, chunks are mapped to normalized labels using a normalization dictionary. Such dictionary consists of a dataframe which was manually annotated, and contains labels that are much more granular (detailed) than the ones assigned by Contextual Parsers. Using normalization labels, it is possible to recognize different forms of the same entities (e.g. 'ct scan' and 'computed tomography' are both mapped to 'ct'), and it also allows to assign multiple labels for each extracted chunk (e.g. 'intracerebral hemorrhage' is labeled both as 'problem' and 'brain').

In [None]:
normalization_df = pd.DataFrame([
    ['brain', '', '', 'brain', ''],
    ['frontal lobe', '', '', 'brain', ''],
    ['parietotemporal lobe', '', '', 'brain', ''],
    ['intracerebral', '', '', 'brain', ''],
    ['ct scan', 'ct', '', '', ''],
    ['computed tomography', 'ct', '', '', ''],
    ['right', '', '', '', 'right'],
    ['mass', '', 'problem', '', ''],
    ['intracerebral hemorrhage', '', 'problem', 'brain', '']], columns = ['chunk', 'imaging_test', 'imaging_finding', 'site', 'laterality'])

print('Sample normalization dict:')
normalization_df

Sample normalization dict:


Unnamed: 0,chunk,imaging_test,imaging_finding,site,laterality
0,brain,,,brain,
1,frontal lobe,,,brain,
2,parietotemporal lobe,,,brain,
3,intracerebral,,,brain,
4,ct scan,ct,,,
5,computed tomography,ct,,,
6,right,,,,right
7,mass,,problem,,
8,intracerebral hemorrhage,,problem,brain,


To create the outcome dataframe:
- the extracted data is merged with the normalization dictionary
- reference values are used to create image IDs, and
- the dataframe is groupbed by caption and imag id.

In [None]:
extraction_df = pd.merge(extraction_df, normalization_df, on='chunk', how='left')

In [None]:
extraction_df['image_id'] = extraction_df['reference'].apply(lambda x: f"image_{x}")
extraction_df.drop('reference', axis = 1, inplace = True)

In [None]:
def aggregate_without_empty_strings(x):
  lst = []
  for element in x:
    if element and element not in lst:
      lst.append(element)
  return lst

In [None]:
label_df = extraction_df.groupby(['image_id', 'caption']).agg(aggregate_without_empty_strings).reset_index().copy()

In [None]:
label_df

Unnamed: 0,image_id,caption,chunk,ner_label,imaging_test,imaging_finding,site,laterality
0,image_A,Brain CT scan. There is a mass in the frontal lobe (,"[brain, ct scan, mass, frontal lobe]","[Site, Imaging_Test, Imaging_Finding]",[ct],[problem],[brain],[]
1,image_B,Brain CT scan. There is a mass in the frontal lobe (,"[brain, ct scan, mass, frontal lobe]","[Site, Imaging_Test, Imaging_Finding]",[ct],[problem],[brain],[]
2,image_C,Brain CT scan. There is a mass in the frontal lobe (. and an intracerebral hemorrhage in the right parietotemporal lobe (.,"[brain, ct scan, mass, frontal lobe, intracerebral hemorrhage, right, parietotemporal lobe]","[Site, Imaging_Test, Imaging_Finding, Laterality]",[ct],[problem],[brain],[right]
3,image_D,Brain CT scan. and an intracerebral hemorrhage in the right parietotemporal lobe (.,"[brain, ct scan, intracerebral hemorrhage, right, parietotemporal lobe]","[Site, Imaging_Test, Imaging_Finding, Laterality]",[ct],[problem],[brain],[right]
