In [None]:
import os
from datetime import datetime
import numpy as np
import pandas as pd
from scipy import signal
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
from pydicom import dcmread
from pydicom.waveforms import multiplex_array

In [None]:
class ECGDICOMReader:
    """ Extract voltage data from a ECG DICOM file
        Author: Philip Croon, p.croon@amsterdamumc.nl
        for questions feel free to email.    
    """

    def __init__(self, augmentLeads=False, resample_500=True):
        """ 
        Initialize class. If resample_500 is True ECGs with sampling frequency that are not 500 will be resampled to 500. 
        If AugmentLeads = True and the augmented leads are not available, they are calculated. 
        """
        self.augmentLeads = augmentLeads
        self.resample_500 = resample_500
        print("Initialization succesfull")

    def __call__(self, path):
        try:
            with open(path, 'rb') as DICOM:
                self.ECG = dcmread(DICOM)
            self.PatientID = self.ECG.PatientID
            self.PatientBirthDate = self.ECG.PatientBirthDate
            self.PatientName = self.ECG.PatientName
            self.PatientSex = self.ECG.PatientSex
            self.StudyDate = self.ECG.StudyDate
            self.StudyTime = self.ECG.StudyTime
            self.Waveforms = self.ECG.waveform_array(0).T
            self.lead_info_final = self.lead_info()
            self.LeadVoltages = self.makeleadvoltages()
            self.sf = self.ECG.WaveformSequence[0].SamplingFrequency
            self.samplingfrequency = self.resampling_500hz()
            self.read_dict_final = self.readable_dict()
            
            return self.read_dict_final

        except Exception as e:
            print(str(e))
            pass

    def readable_dict(self):
        """Make a readable dict"""
        read_dict = {}
        read_dict["PatientID"] = self.PatientID
        read_dict["PatientBirthDate"] = self.PatientBirthDate
        # read_dict["PatientName"] = str(self.PatientName)
        read_dict["PatientSex"] = self.PatientSex
        read_dict["StudyDate"] = datetime.strptime(self.StudyDate, "%Y%m%d").strftime('%Y-%m-%d') 
                #if your date is different format adapt
        read_dict["StudyTime"] = self.StudyTime
        read_dict["Sampling frequency"] = self.samplingfrequency
        read_dict["Sample shape"] = len(list(self.LeadVoltages.values())[0])
        read_dict["Waveforms"] = self.LeadVoltages
        return read_dict

    def makeleadvoltages(self):
        """Extracts the voltages out of the DICOM"""
        num_leads = 0
        leads = {}

        for i, lead in enumerate(self.Waveforms):
            num_leads += 1
            leads[self.lead_info_final[i]] = lead

        if num_leads == 8 and self.augmentLeads:
            leads['III'] = np.subtract(leads['II'], leads['I'])
            leads['aVR'] = np.add(leads['I'], leads['II']) * (-0.5)
            leads['aVL'] = np.subtract(leads['I'], 0.5 * leads['II'])
            leads['aVF'] = np.subtract(leads['II'], 0.5 * leads['I'])
        return leads

    def lead_info(self):
        """returns the names of the channels from the DICOM"""
        leadnames = {}
        for ii, channel in enumerate(self.ECG.WaveformSequence[0].ChannelDefinitionSequence):
            source = channel.ChannelSourceSequence[0].CodeMeaning
            units = "unitless"
            if "ChannelSensitivity" in channel:
                units = channel.ChannelSourceSequence[0].CodeMeaning
            leadnames[ii] = source
        return leadnames

    def resampling_500hz(self):
        """In case sf is 250, make 500"""
        if self.resample_500 is False:
            return int(self.sf)
        else:
            if int(self.sf) != 500:
                for i in self.LeadVoltages:
                    self.LeadVoltages[f"{i}"] = signal.resample(self.LeadVoltages[f"{i}"], 5000)
                    self.oversampled = "Yes"
                self.sf = 500
                return self.sf
            else:
                return self.sf
        return self

In [None]:
ecgreader = ECGDICOMReader()

In [None]:
dicom = ecgreader(path_to_dicom)

In [None]:
dicom

In [None]:
plt.plot(dicom["Waveforms"]["I"])