/
DICOM_anonymizer.py
85 lines (62 loc) · 2.23 KB
/
DICOM_anonymizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import random
import string
import pydicom
import os.path
import argparse
from os import walk
from joblib import Parallel, delayed, cpu_count
__author__ = 'Alessandro Delmonte'
__email__ = 'delmonte.ale92@gmail.com'
def main():
dir_path, q = setup()
if q:
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
print('Parallel computing enabled')
with Parallel(n_jobs=cpu_count(), backend='threading') as parallel:
parallel(delayed(anonymize)(root, files) for root, _, files in walk(dir_path))
def anonymize(root, files):
files = [os.path.join(root, file) for file in files]
random_string = ''.join(random.choice(string.ascii_uppercase) for _ in range(8))
for file in files:
try:
ds = pydicom.dcmread(file)
ds.walk(del_callback)
ds.data_element('PatientName').value = random_string
ds.data_element('PatientID').value = "(??)"
ds.data_element('InstitutionName').value = "(??)"
ds.data_element('SeriesDescription').value = "(??)"
ds.data_element('ProtocolName').value = "(??)"
for tag in ['PatientWeight', 'AdditionalPatientHistory']:
if tag in ds:
delattr(ds, tag)
os.remove(file)
ds.save_as(file)
except:
pass
def del_callback(ds, data_element):
if data_element.VR == 'PN':
data_element.value = 'ANONYMOUS'
if data_element.VR == 'DA':
data_element.value = "20000101"
if data_element.VR == 'TM':
data_element.value = "000000"
if data_element.VR == 'SH':
data_element.value = "(??)"
def setup():
parser = argparse.ArgumentParser()
parser.add_argument('DICOM_Folder', help='Database to anonimyze', type=check_folder)
parser.add_argument('-q', '--quiet', help='Suppress output', action='store_true')
args = parser.parse_args()
return args.DICOM_Folder, args.quiet
def check_folder(value):
if not os.path.isdir(value):
raise argparse.ArgumentTypeError('Path is not a directory: %s' % value)
else:
return value
if __name__ == '__main__':
main()
sys.exit()