In [1]:
import pandas as pd
from pprint import pprint

In [2]:
rules_table = pd.read_html('../docs/hippa_safe_harbour_rules.html')[0]

In [3]:
rules_table

Unnamed: 0,0,1,2,3,4
0,Tag,Attribute Name,TCIA Profile,TCIA Implementation,Final CTP Script
1,"(0000,1000)",Affected SOP Instance UID,X,remove,@remove()
2,"(0000,1001)",Requested SOP Instance UID,U,hashuid,"@hashuid(@UIDROOT,this)"
3,"(0002,0003)",Media Storage SOP Instance UID,U,hashuid,"@hashuid(@UIDROOT,this)"
4,"(0004,1511)",Referenced SOP Instance UID in File,U,hashuid,"@hashuid(@UIDROOT,this)"
...,...,...,...,...,...
606,"(0074,1236)",Requesting AE,X,remove,@remove()
607,"(0008,0054)",Retrieve AE Title,X,remove,@remove()
608,"(0072,005E)",Selector AE Value,D,remove,@remove()
609,"(0064,0003)",Source Frame of Reference UID,U,hashuid,"@hashuid(@UIDROOT,this)"


In [4]:
new_header = rules_table.iloc[0] #grab the first row for the header
rules_table = rules_table[1:]
rules_table.columns = new_header #set the header row as the df header

In [5]:
rules_table

Unnamed: 0,Tag,Attribute Name,TCIA Profile,TCIA Implementation,Final CTP Script
1,"(0000,1000)",Affected SOP Instance UID,X,remove,@remove()
2,"(0000,1001)",Requested SOP Instance UID,U,hashuid,"@hashuid(@UIDROOT,this)"
3,"(0002,0003)",Media Storage SOP Instance UID,U,hashuid,"@hashuid(@UIDROOT,this)"
4,"(0004,1511)",Referenced SOP Instance UID in File,U,hashuid,"@hashuid(@UIDROOT,this)"
5,"(0008,0012)",Instance Creation Date,C,incrementdate,"@incrementdate(this,@DATEINC)"
...,...,...,...,...,...
606,"(0074,1236)",Requesting AE,X,remove,@remove()
607,"(0008,0054)",Retrieve AE Title,X,remove,@remove()
608,"(0072,005E)",Selector AE Value,D,remove,@remove()
609,"(0064,0003)",Source Frame of Reference UID,U,hashuid,"@hashuid(@UIDROOT,this)"


In [6]:
rules_table['TCIA Profile'].value_counts()

TCIA Profile
C    284
X    202
U     54
Z     32
D     29
K      9
Name: count, dtype: int64

In [7]:
rules_table['TCIA Implementation'].value_counts()

TCIA Implementation
remove           237
keep             121
incrementdate     96
hashuid           55
time              43
empty             22
replace           20
process           12
lookup             2
hashname           1
Remove Unsafe      1
Name: count, dtype: int64

In [8]:
unique_functions = rules_table['TCIA Implementation'].unique()
unique_functions

array(['remove', 'hashuid', 'incrementdate', 'time', 'empty', 'keep',
       'process', 'lookup', 'replace', 'hashname', 'Remove Unsafe'],
      dtype=object)

In [9]:
def tag_str_to_hex_str(dicomtag: str):
    # Removing parentheses and splitting the string
    dicomtag = dicomtag.strip("()").split(",")

    # Converting to a tuple of hex numbers
    hextuple = f"(0x{dicomtag[0].strip()}, 0x{dicomtag[1].strip()})"

    return hextuple

def rules_to_tag_name(rule:str):
    rule = rule.replace('/', '_')
    rule = rule.replace('*', '_STAR')
    return f"{rule}_TAGS"

def filter_list_from_list(target: list, source: list):
    filtered = []
    for item in target:
        if item in source:
            continue
        filtered.append(item)
    return filtered

In [10]:
rules = {}

for r in unique_functions:
    rule_tags = rules_table.loc[
        (rules_table['TCIA Implementation'] == r), 
        'Tag'
    ]
    rule_tags = rule_tags.tolist()
    rule_tags = [tag_str_to_hex_str(t) for t in rule_tags]
    rules[r] = rule_tags

In [11]:
rules

{'remove': ['(0x0000, 0x1000)',
  '(0x0008, 0x0080)',
  '(0x0008, 0x0081)',
  '(0x0008, 0x0082)',
  '(0x0008, 0x0092)',
  '(0x0008, 0x0094)',
  '(0x0008, 0x0096)',
  '(0x0008, 0x009D)',
  '(0x0008, 0x0201)',
  '(0x0008, 0x1010)',
  '(0x0008, 0x1040)',
  '(0x0008, 0x1041)',
  '(0x0008, 0x1048)',
  '(0x0008, 0x1049)',
  '(0x0008, 0x1050)',
  '(0x0008, 0x1052)',
  '(0x0008, 0x1060)',
  '(0x0008, 0x1062)',
  '(0x0008, 0x1070)',
  '(0x0008, 0x1072)',
  '(0x0008, 0x1120)',
  '(0x0008, 0x2111)',
  '(0x0008, 0x4000)',
  '(0x0010, 0x0021)',
  '(0x0010, 0x0032)',
  '(0x0010, 0x0050)',
  '(0x0010, 0x0101)',
  '(0x0010, 0x0102)',
  '(0x0010, 0x1000)',
  '(0x0010, 0x1001)',
  '(0x0010, 0x1002)',
  '(0x0010, 0x1005)',
  '(0x0010, 0x1040)',
  '(0x0010, 0x1050)',
  '(0x0010, 0x1060)',
  '(0x0010, 0x1080)',
  '(0x0010, 0x1081)',
  '(0x0010, 0x1090)',
  '(0x0010, 0x1100)',
  '(0x0010, 0x2110)',
  '(0x0010, 0x2150)',
  '(0x0010, 0x2152)',
  '(0x0010, 0x2154)',
  '(0x0010, 0x2155)',
  '(0x0010, 0x2180)',


In [12]:
import json

with open('../docs/tcia_supplied_deid_attrs.json', 'w', encoding='utf-8') as f:
    json.dump(rules, f, ensure_ascii=False, indent=4)

### Check the TCIA anonymizer

In [1]:
import sys
from pathlib import Path
import pandas as pd
# setting path
sys.path.append('../')

In [2]:
rootdir = '/home/r079a/Desktop/de-identification/'

raw_img_path = Path(rootdir, 'dataset/images/manifest-1617826555824', 
                    'Pseudo-PHI-DICOM-Data/6670427471/05-26-2000-NA-FORFILE CT ABD ANDOR PEL - CD-25398/5.000000-NEPHRO  4.0  B40f  M0.4-18678')
deid_img_path = Path(rootdir, 'dataset/images-2/manifest-1617826161202', 
                     'Pseudo-PHI-DICOM-Data/Pseudo-PHI-010/05-09-1990-NA-CT ABD ANDOR PEL - CD-10243/5.000000-NEPHRO  4.0  B40f  M0.4-32617')
output_path = Path(rootdir, 'dicom-output/v2_outputs')

first_dcm_name = '1-001.dcm'

sample_img_path = Path(raw_img_path, first_dcm_name)
deid_img_path = Path(deid_img_path, first_dcm_name)
output_file = Path(output_path, first_dcm_name)

print(sample_img_path)

/home/r079a/Desktop/de-identification/dataset/images/manifest-1617826555824/Pseudo-PHI-DICOM-Data/6670427471/05-26-2000-NA-FORFILE CT ABD ANDOR PEL - CD-25398/5.000000-NEPHRO  4.0  B40f  M0.4-18678/1-001.dcm


In [3]:
from dcm_anonymizers.tcia_deid import DCMTCIAAnonymizer
from dcm_anonymizers.phi_detectors import DcmPHIDetector, DcmRobustPHIDetector
from dcm_anonymizers.ps_3_3 import format_action_dict, replace_with_value

In [4]:
TCIA_DEID_ATTRS_JSON = '../dcm_anonymizers/tcia_deid_attrs.json'

In [5]:
phi_detector = DcmRobustPHIDetector()
anonymizer = DCMTCIAAnonymizer(
    phi_detector=None,
    rules_json_path = TCIA_DEID_ATTRS_JSON,
    notes_phi_detector=None,
    apply_custom_actions=True,
)

08/22/2024 18:21:41 - DEBUG - dcm_anonymizers.ps_3_3 - PS3.3 init
08/22/2024 18:21:41 - DEBUG - dcm_anonymizers.ps_3_3 - TCIA anonymizer init


In [6]:
patient_attrs_action = {
    "(0x0010, 0x0010)": replace_with_value(['Pseudo-PHI-007']),
    "(0x0010, 0x0020)": replace_with_value(['Pseudo-PHI-007']),
}

patient_attrs_action = format_action_dict(patient_attrs_action)


history, ignored_tags = anonymizer.anonymize(
    input_path=str(sample_img_path),
    output_path=str(output_file),
    custom_actions=patient_attrs_action,
)

{(0018, 1200): 'tcia_delete', (0008, 0020): 'replace', (0008, 0021): 'replace', (0008, 0022): 'replace', (0008, 0023): 'replace', (0008, 002a): 'replace', (0010, 21d0): 'replace', (0040, 0244): 'replace', (0008, 0018): 'replace_UID', (0008, 3010): 'replace_UID', (0020, 000d): 'replace_UID', (0020, 000e): 'replace_UID', (0020, 0052): 'replace_UID', (0088, 0140): 'replace_UID', (0008, 0050): 'empty', (0008, 0090): 'empty', (0010, 0030): 'empty', (0010, 0010): 'apply_replace_with_value', (0010, 0020): 'apply_replace_with_value'}


In [7]:
# import re

# def clear_mistaken_highlights(textpart: str):
#     pattern = re.compile(r'<<(PATIENT|STAFF|AGE|DATE|LOCATION|PHONE|ID|EMAIL|PATORG|HOSPITAL|OTHERPHI):((.)*)?>>', re.DOTALL)
#     matches = re.search(pattern, textpart)
#     if matches:
#         textpart = textpart.replace(matches.group(0), matches.group(2))

#     return textpart
        

In [8]:
# text = """\
# Private Creator: GEMS_PETD_01,\nGE Discovery PET Implementation Version Name: GE Advance,\nPET compatible_version: 05.00,\n\
# GE Advance Patient.software_version: 16.01,\nPET compatible_version: 05.00,\nPET software_version: 16.01,\nPET compatible_version: 05.00,\n\
# PET software_version: 16.01,\nPET scan_description: PET CT INFERIOR TO SUPERIOR,\nPET scanner_desc: Discovery LS,\n\
# PET manufacturer: GE MEDICAL SYSTEMS,\n\
# PET landmark_name: Orbital Meatal Line,\n\
# PET landmark_abbrev: OM,\n\
# PET tracer_name: FDG -- fluorodeoxyglucose,\n\
# PET radionuclide_name: 18F,\n\
# PET compatible_version: 05.00,\n\
# PET software_version: 16.01,\n\
# PET compatible_version: 05.00,\n\
# PET software_version: 16.01,\n\
# PET compatible_version: 05.00,\n\
# PET software_version: 16.01,\n\
# Private Creator: GEMS_PETD_01,\n\
# Private Creator: CTP,\n\
# Private tag data: Pseudo-PHI-DICOM-Data,\n\
# Private tag data: 87009668\n\
# """


# outputs = phi_detector.run_deid([text])

# entities = []
# current = 0

# for item in outputs[0]:
#     itemval = item[0]
#     itemval = clear_mistaken_highlights(itemval)
    
#     if item[1]:
#         found_in = text[current:].find(itemval)
#         start = current + found_in
#         entity = (itemval, item[1], start)
#         entities.append(entity)

#         assert text[start:start+len(itemval)] == itemval, "segmenting entities from note text mismatch"

#     current += len(itemval)

# print(entities)

In [9]:
print(history)

{(0018, 1200): 'tcia_delete', (0008, 0020): 'replace', (0008, 0021): 'replace', (0008, 0022): 'replace', (0008, 0023): 'replace', (0008, 002a): 'replace', (0010, 21d0): 'replace', (0040, 0244): 'replace', (0008, 0018): 'replace_UID', (0008, 3010): 'replace_UID', (0020, 000d): 'replace_UID', (0020, 000e): 'replace_UID', (0020, 0052): 'replace_UID', (0088, 0140): 'replace_UID', (0008, 0050): 'empty', (0008, 0090): 'empty', (0010, 0030): 'empty', (0010, 0010): 'apply_replace_with_value', (0010, 0020): 'apply_replace_with_value'}


In [10]:
print(ignored_tags)

[((0009, 0010), 'replace'), ((0013, 0010), 'replace'), ((0013, 1010), 'replace'), ((0013, 1013), 'replace'), ((0029, 0010), 'replace'), ((0029, 0011), 'replace'), ((0029, 0012), 'replace'), ((0903, 0010), 'replace'), ((0905, 0010), 'replace'), ((7fd1, 0010), 'replace'), ((0008, 2111), 'replace'), ((0020, 4000), 'replace'), ((0008, 1030), 'keep'), ((0008, 103e), 'keep'), ((0008, 1080), 'keep'), ((0010, 0040), 'keep'), ((0010, 21a0), 'keep'), ((0018, 0010), 'keep'), ((0018, 1030), 'keep'), ((0032, 1060), 'keep'), ((0040, 0254), 'keep')]


In [11]:
import pydicom
from pydicom import dcmread

In [12]:
with open(sample_img_path, 'rb') as infile:
    dataset = dcmread(infile)

In [13]:
all_text = ''
for tag_tuple in ignored_tags:
    element = dataset.get(tag_tuple[0])
    print(DcmRobustPHIDetector.element_to_text(element))

Private Creator: GEIIS
Private Creator: CTP
Private tag data: Pseudo-PHI-DICOM-Data
Private tag data: 87009668
Private Creator: SIEMENS CSA HEADER
Private Creator: SIEMENS MEDCOM HEADER
Private Creator: SIEMENS MEDCOM OOG
Private Creator: GEIIS PACS
Private Creator: GEIIS
Private Creator: GEIIS
Derivation Description: SIEMENS_MED_CT_DUAL_ENERGY
Image Comments: Sn ULTRAVIST 240/200ML
NS/100ML, ISTAT=0.7 mg/dL
Study Description: FORFILE CT ABD AND/OR PEL - CD
Series Description: NEPHRO  4.0  B40f  M_0.4
Admitting Diagnoses Description: HEMATURIA
Patient's Sex: M
Smoking Status: NO
Contrast/Bolus Agent: ULTRAVIST 240
Protocol Name: DE_HEMATURIA
Requested Procedure Description: CT ABDOMEN & PELVIS W/WO IVC
Performed Procedure Step Description: CT ABDOMEN & PELVIS W/WO IVC


In [14]:
tagvalues = []
    
def extract_tags(dcm, gt_ds, annon_ds):
    elements = dcm
    gt_elements = gt_ds
    annon_elements = annon_ds
    parent_tag = None
    
    if isinstance(dcm, pydicom.dataelem.DataElement):
        parent_tag = dcm.tag
        elements = dcm.value[0]
        gt_elements = gt_ds.value[0] if gt_ds else None
        annon_elements = annon_ds.value[0] if annon_ds else None
        
    for element in elements:
        deidelem = gt_elements.get(element.tag) if gt_elements else None
        dcmannonelem = annon_elements.get(element.tag) if annon_elements else None
        
        if element.VR == 'OW':
            continue
        elif element.VR == 'SQ':
            extract_tags(element, deidelem, dcmannonelem)
            continue
        # targettags.append(element.tag)
        
        deidval = "Not available"
        if deidelem:
            deidval = str(deidelem.value)
        dcmannonval = "Not available"
        if dcmannonelem:
            dcmannonval = str(dcmannonelem.value)
        changed = False
        if dcmannonval != deidval:
            changed = True

        element_tag_str = str(element.tag)
        if parent_tag:
            element_tag_str = f"{str(parent_tag)} - {str(element.tag)}"
        values_tuple = (element_tag_str, element.VR, element.name, str(element.value), deidval, dcmannonval, changed)
        tagvalues.append(values_tuple)

In [15]:
with open(sample_img_path, 'rb') as infile:
    raw_ds = dcmread(infile)

with open(deid_img_path, 'rb') as deidfile:
    deid_ds = dcmread(deidfile)

with open(output_file, 'rb') as outfile:
    anon_ds = dcmread(outfile)

In [16]:
extract_tags(raw_ds, deid_ds, anon_ds)

valus_df = pd.DataFrame(tagvalues, columns =['Tag', 'VR', 'Name', 'Raw Metadata', 'Ground Truth Anonimization', 'TCIA deId Anonymizer', 'Changed'])

pd.set_option('display.max_rows', 75)

changed_df = valus_df[valus_df['Changed']]
changed_df

Unnamed: 0,Tag,VR,Name,Raw Metadata,Ground Truth Anonimization,TCIA deId Anonymizer,Changed
3,"(0008, 0018)",UI,SOP Instance UID,2.25.192064249168232522825882814377465855478,1.3.6.1.4.1.14519.5.2.1.8700.9668.216701533061...,1.2.826.0.1.3680043.8.498.97583266466505267626...,True
4,"(0008, 0020)",DA,Study Date,20000526,19900509,20000923,True
5,"(0008, 0021)",DA,Series Date,20000526,19900509,20000923,True
6,"(0008, 0022)",DA,Acquisition Date,20000526,19900509,20000923,True
7,"(0008, 0023)",DA,Content Date,20000526,19900509,20000923,True
8,"(0008, 002a)",DT,Acquisition DateTime,20000526125439.765000,19900509,20000923125439.765000,True
21,"(0008, 1030)",LO,Study Description,FORFILE CT ABD AND/OR PEL - CD,CT ABD AND/OR PEL - CD,FORFILE CT ABD AND/OR PEL - CD,True
25,"(0008, 1032) - (0008, 0104)",LO,Code Meaning,FORFILE CT ABD AND/OR PEL - CD,CT ABD AND/OR PEL - CD,FORFILE CT ABD AND/OR PEL - CD,True
30,"(0008, 1110) - (0008, 1150)",UI,Referenced SOP Class UID,1.2.840.10008.3.1.2.3.1,1.2.840.10008.3.1.2.3.1,2.25.298976980049738810937058232965138021563,True
31,"(0008, 1110) - (0008, 1155)",UI,Referenced SOP Instance UID,2.25.98465855254525200775877816255333907958,1.3.6.1.4.1.14519.5.2.1.8700.9668.191160684599...,2.25.289035450724840812731755438066430323910,True


In [17]:
n_mismatched = 0
mismatched_tags = []

for index, row in valus_df.iterrows():
    gt_val = row['Ground Truth Anonimization']
    target_val = row['TCIA deId Anonymizer']

    if gt_val != target_val:
        if row['VR'] == 'UI' and not (gt_val == "" or target_val == ""):
            continue
        elif row['VR'] in ('DA', 'DT', 'TM') and not (gt_val == "" or target_val == ""):
            if len(gt_val) != len(target_val):
                n_mismatched += 1
                mismatched_tags.append(row['Name'])
            continue
        elif row['Tag'] in ('(0010, 0010)', '(0010, 0020)'):
            continue
        n_mismatched += 1
        mismatched_tags.append(row['Name'])

print(n_mismatched)
print(mismatched_tags)

3
['Acquisition DateTime', 'Study Description', 'Code Meaning']


In [20]:
# def str_element_val(element):
#     if isinstance(element.value, bytes):
#         return element.value.decode("utf-8")
#     return str(element.value)


# for element in raw_ds:
#     if element.VR == 'OW':
#             continue
#     if element.tag.is_private:
#         target_val = 'Not Availble'
#         deid_el = deid_ds.get(element.tag)
#         if deid_el:
#             target_val = str_element_val(deid_el)
#         print(f"{element.tag} {element.VR} {element.name}: {element.value} -> {target_val}")

In [30]:
# for element in raw_ds:
#     if element.VR == 'OW':
#             continue
#     if element.tag.is_private:
#         entities = phi_detector.detect_entities_from_element(element)
#         if len(entities) > 0:
#             print(element)
#             print(entities)

In [31]:
# all_texts = ''
# tag_position_map = {}

# for element in raw_ds:
#     if element.VR == 'OW':
#             continue
#     if element.tag.is_private:
#         if element.VR in ("LO", "LT", "SH", "PN", "CS", "ST", "UT", "UN") and element.value != "":
#             element_val = phi_detector.process_element_val(element)
#             element_name = phi_detector.processed_element_name(element.name)
#             element_text = f"{element_name}: {element_val}"
#             start = len(all_texts) + len(element_name) + 1
#             end = start + len(element_name)
#             tag_position_map[element.tag] = (start, end)
#             if all_texts == '':
#                 all_texts += f"{element_text}"
#             else:
#                 all_texts += f", {element_text}"

# print(all_texts)

# print(tag_position_map)

In [32]:
# entities = phi_detector.detect_entities(all_texts)
# # print(entities)
# element_target = {}

# for e in entities:
#     e_start = e[2]
#     for t in tag_position_map:
#         if e_start >= tag_position_map[t][0] and e_start <= tag_position_map[t][1]:
#             if t in element_target:
#                 element_target[t].append(e[0])
#             else:
#                 element_target[t] = [e[0]]

# print(element_target)

In [33]:
# def replace_element_values_from_entity_values(elemval, entity_values: list):
#     elemval = str(elemval)
#     n_words = len(elemval.split())
#     deid_val = elemval[:]
#     if n_words == 1:
#         elem_n_chars = len(elemval)
#         entity_n_chars = len(''.join(entity_values))
#         if elem_n_chars < 2*entity_n_chars:
#             deid_val = ""
#     else:
#         for entity_val in entity_values:
#             deid_val = deid_val.replace(entity_val, '', 1)
            
#         remaining_value_prcnt = len(deid_val) / len(elemval)
#         # return empty string in case of value almost stripped by anonymizer
#         if remaining_value_prcnt <= 0.2:
#             deid_val = ""

#     return deid_val

In [34]:
# for tag in element_target:
#     element = raw_ds.get(tag)
#     deid_val = replace_element_values_from_entity_values(element.value, element_target[tag])

#     print(deid_val)