This code is for Preprocessing the original (Raw) ShARe-13, ShARe-14 and CADEC to the DocDiscNER Format



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


1) ShaRe-13 Preprocessing

In [None]:
import csv
import os

def extract_spans(annotation_file, text_file):
    spans = []
    extracted_spans = []
    with open(annotation_file, 'r') as ann_file:
        for line in ann_file:
            line_span = []
            parts = line.strip().split("||")
            if len(parts) > 5:  #  if there are multipe segments
                start_end_offsets = parts[3:]
                for i in range(0, len(start_end_offsets), 2):
                    start = int(start_end_offsets[i])
                    end = int(start_end_offsets[i+1])
                    line_span.append((start, end))
                merged_span = ",".join([f"{start}-{end}" for start, end in line_span])
                spans.append(merged_span)
            elif len(parts) == 5:  # if there is only one segment
                start = int(parts[3])
                end = int(parts[4])
                line_span.append((start, end))
                spans.append(f"{start}-{end}")

            with open(text_file, 'r') as txt_file:
                text = txt_file.read()

            disc_span = ""
            for start, end in line_span:
              disc_span = disc_span + " " + text[start:end]
            disc_span = disc_span.strip().replace('\n', ' ')
            extracted_spans.append(disc_span)
    return extracted_spans, spans


def process_data(text_folder, annotation_folder, output_file):
    with open(output_file, 'w', newline='') as output:
        writer = csv.writer(output)
        writer.writerow(["Report_ID", "Text", "Spans", "Offsets"])

        for ann_file in os.listdir(annotation_folder):
            report_id = ann_file.split('.')[0]
            annotation_path = os.path.join(annotation_folder, ann_file)
            text_path = os.path.join(text_folder, report_id + '.txt')

            spans, offsets = extract_spans(annotation_path, text_path)
            text = open(text_path, 'r').read()

            spans_text = '\n'.join(spans)
            offsets_text = '\n'.join(offsets)



            writer.writerow([report_id, text, spans_text, offsets_text])

# paths to the (Raw) ShARe-13  file
text_folder = '/content/drive/MyDrive/DocDiscNER/Datasets/Share13/Train/text'
annotation_folder = '/content/drive/MyDrive/DocDiscNER/Datasets/Share13/Train/ann'
output_file = 'Doc-ShaRe-13.csv'

# Process the data
process_data(text_folder, annotation_folder, output_file)

In [None]:
# Now take the resulting output and chunk it in DocDiscNER_Chunker.ipynb

2) ShaRe-14 Preprocessing

In [None]:
import csv
import os

def extract_spans(annotation_file, text_file):
    spans = []
    extracted_spans = []
    with open(annotation_file, 'r') as ann_file:
        for line in ann_file:
            line_span = []
            parts = line.strip().split("|")
            second_split = parts[1]
            if second_split.count(",") >= 1:  #  if there are multipe segments
                offsets_segments = second_split.split(",")
                for segment in offsets_segments:
                    start, end = map(int, segment.split("-"))
                    line_span.append((start, end))
                merged_span = ",".join([f"{start}-{end}" for start, end in line_span])
                spans.append(merged_span)
            else:   #  if there is only one segment
                start, end = map(int,  second_split.split("-"))
                line_span.append((start, end))
                spans.append(f"{start}-{end}")

            with open(text_file, 'r') as txt_file:
                text = txt_file.read()

            disc_span = ""
            for start, end in line_span:
              disc_span = disc_span + " " + text[start:end]
            disc_span = disc_span.strip().replace('\n', ' ')
            extracted_spans.append(disc_span)
    # I noticed duplicates in ShaRe-14, this code block is to remove duplicates
    seen_tuples = set()

    # iterate over indices of spans in reverse order to  remove duplicates
    for i in range(len(spans) - 1, -1, -1):
        detector = spans[i]
        if (detector) in seen_tuples:
            # If the tuple is already seen, remove it from both lists
            del spans[i]
            del extracted_spans[i]
        else:
            # If the tuple is not seen, add it to the set
            seen_tuples.add(detector)
    # end of deduplication code segment
    # in future, if i want to add CUIs I need to remove this deduplicator
    # or i need to figure out way to add CUIs in parallel with spans and extracted_spans
    return extracted_spans, spans


def process_data(text_folder, annotation_folder, output_file):
    with open(output_file, 'w', newline='') as output:
        writer = csv.writer(output)
        writer.writerow(["Report_ID", "Text", "Spans", "Offsets"])

        for ann_file in os.listdir(annotation_folder):
            report_id = ann_file.split('.')[0]
            annotation_path = os.path.join(annotation_folder, ann_file)
            text_path = os.path.join(text_folder, report_id + '.txt')

            spans, offsets = extract_spans(annotation_path, text_path)
            text = open(text_path, 'r').read()

            spans_text = '\n'.join(spans)
            offsets_text = '\n'.join(offsets)



            writer.writerow([report_id, text, spans_text, offsets_text])

# Provide paths to the (Raw) ShARe-14  file
text_folder = '/content/drive/MyDrive/DocDiscNER/Datasets/Share14/test/text'
annotation_folder = '/content/drive/MyDrive/DocDiscNER/Datasets/Share14/test/ann'
output_file = 'Test-Doc-ShaRe-14'

# Process the data
process_data(text_folder, annotation_folder, output_file)

In [None]:
# Now take the resulting output and chunk it in DocDiscNER_Chunker.ipynb

CADEC Preprocessing

In [None]:
# Since CADEC is not lengthy, no need for chunking, we are just going to preprocess CADEC to the DocDiscNER format
import csv
import os
import re

def extract_spans(annotation_file):
    spans = []
    extracted_spans = []
    with open(annotation_file, 'r') as ann_file:
        for line in ann_file:

          parts = line.split('\t')
          if len(parts) < 3:
            continue
          # extract span_offsets and span_text from the parts
          span_offsets_text = parts[1].strip()
          span_offsets_text = re.sub(r"CONCEPT_LESS\s?", "", span_offsets_text)
          span_offsets_text = re.sub(r"\b\d{8}\b\s?", "", span_offsets_text)
          span_offsets_text = re.sub(r"\+\s?", "", span_offsets_text)
          span_text = parts[2].strip()

          extracted_spans.append(span_text)
          spans.append(span_offsets_text)

    return extracted_spans, spans


def process_data(text_folder, annotation_folder, output_file):
    with open(output_file, 'w', newline='') as output:
        writer = csv.writer(output)
        writer.writerow(["Report_ID", "Text", "Spans", "Offsets"])

        for ann_file in os.listdir(annotation_folder):
            report_id = '.'.join(ann_file.split('.')[:-1])
            annotation_path = os.path.join(annotation_folder, ann_file)
            text_path = os.path.join(text_folder, report_id + '.txt')

            spans, offsets = extract_spans(annotation_path)
            text = open(text_path, 'r').read()

            spans_text = '\n'.join(spans)
            offsets_text = '\n'.join(offsets)



            writer.writerow([report_id, text, spans_text, offsets_text])

#  paths to the (Raw) CADEC  file and meddra annotations
text_folder = '/content/drive/MyDrive/DocDiscNER/Datasets/CADEC/text'
annotation_folder = '/content/drive/MyDrive/DocDiscNER/Datasets/CADEC/meddra'
output_file = 'Doc-CADEC.csv'

process_data(text_folder, annotation_folder, output_file)

Splitting CADEC to Train,Val,Test

In [None]:
# the splits are based on the IDs provided by (Dai et al.,2020): https://github.com/daixiangau/acl2020-transition-discontinuous-ner/tree/master/data/cadec/split
import pandas as pd

df = pd.read_csv('/content/Doc-CADEC.csv')
with open('/content/Val_ID.txt', 'r') as f:
    validation_ids = f.read().splitlines()

with open('/content/Test_ID.txt', 'r') as f:
    test_ids = f.read().splitlines()

validation_set = set(validation_ids)
test_set = set(test_ids)

def determine_set(row):
    if row['Report_ID'] in validation_set:
        return 'Validation'
    elif row['Report_ID'] in test_set:
        return 'Test'
    else:
        return 'Train'

# create a new column indicating the set
df['Set'] = df.apply(determine_set, axis=1)

# Split the dataframe into training, validation, and test sets
train_df = df[df['Set'] == 'Train']
validation_df = df[df['Set'] == 'Validation']
test_df = df[df['Set'] == 'Test']

# Drop the 'Set' column as it's no longer needed
train_df.drop(columns=['Set'], inplace=True)
validation_df.drop(columns=['Set'], inplace=True)
test_df.drop(columns=['Set'], inplace=True)

# save the split datasets to new CSV files
train_df.to_csv('train_dataset.csv', index=False)
validation_df.to_csv('validation_dataset.csv', index=False)
test_df.to_csv('test_dataset.csv', index=False)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df.drop(columns=['Set'], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  validation_df.drop(columns=['Set'], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df.drop(columns=['Set'], inplace=True)


Format CADEC spans to ADR: ---;

In [None]:
import pandas as pd
import re

df = pd.read_csv("/content/CADEC_Validation.csv")

# Because CADEC has some NaN spans, I added condition for that
def format_spans(row):
    spans = str(row["Spans"])
    if spans != "nan":  # Check if the value is not NaN
        spans = spans.split("\n")
        formatted_spans = "; ".join([f"disorder: {span}" for span in spans])
        return formatted_spans
    else:
        return ""  # empty string if the value is NaN

df["Spans"] = df.apply(format_spans, axis=1)

new_df = df[["Text", "Spans"]]
new_df.to_csv("preprocessed_dataset.csv", index=False)