# Read S5000F message metadata

<ol><li>Extract metadata of an XML message by processing its header and trailer parts.<li>Store message metadata in a SQL database to ensure its persistence<li>Create an XML message header and trailer parts from an input dataframe.</ol>

[HTML version](./Create_Hums_message.html)

**Date** : 07/05/2020 

**Program Version ** : 1.0           
**Python Version **  : 3.8.1

**Source repository** : https://github.com/BernardRaust/Bicycle-trek-Chemin-de-la-Simone

**Support** : <mailto:bernard.raust@gmail.com>

---------

This python module read a S5000F message to extract header and trailer information.

**Input**:     XML message<br>
**Parameter**:     Series (name of information,xpath of information in XML) <br>
**Output**:     Series (name of information, value of information) <br>



---------

In [234]:
import lxml.etree as etree
import pandas as pd
import numpy as np
import os, re

__author__ = "Bernard Raust"
__credits__ = ["Bernard Raust"]
__version__ = "1.0.0"
__maintainer__ = "Bernard Raust"
__email__ = "bernard.raust@gmail.com"
__status__ = "Development"

In [330]:
# XML paths for information located in S5000F message header and trailer 
header_dict =  {'msgUid':          'n1:isfDataset/@uid',
                'msgId':           'n1:isfDataset/msgId/id/text()',
                'msgCode':         'n1:isfDataset/msgType/code/text()',
                'msgDate':         'n1:isfDataset/msgDate/date/text()',
                'msgTime':         'n1:isfDataset/msgDate/time/text()',
                'msgStatus':       'n1:isfDataset/msgStatus/state/text()',
                'msgContext':      'n1:isfDataset/msgContext/context/projRef/id/text()',
                'msgRelation':     'n1:isfDataset/relatedMsg/relType/code/text()',
                'relatedMsg':      'n1:isfDataset/relatedMsg/msgRef/msgId/id/text()',                
                'msgSenderCode':   'n1:isfDataset/msgPty[1]/ptyType/code/text()',
                'msgSenderId':     'n1:isfDataset/msgPty[1]/party/persRef/persId/id/text()',
                'msgReceiverCode': 'n1:isfDataset/msgPty[2]/ptyType/code/text()',
                'msgReceiver':     'n1:isfDataset/msgPty[2]/party/persRef/persId/id/text()',
                'msgRemark':       'n1:isfDataset/rmk/text/descr/text()',
                'msgSecurity':     'n1:isfDataset/secs/sec/secClassDefRef/secClass/name/text()'}

xpath_df = pd.DataFrame.from_dict(header_dict,orient='index',columns=['xpath'])
#xpath_df.index.name ='name'

# S5000F XML namespace
nmsp ={'n1':"http://www.asd-europe.org/s-series/s5000f"}

In [331]:
xpath_df

Unnamed: 0,xpath
msgUid,n1:isfDataset/@uid
msgId,n1:isfDataset/msgId/id/text()
msgCode,n1:isfDataset/msgType/code/text()
msgDate,n1:isfDataset/msgDate/date/text()
msgTime,n1:isfDataset/msgDate/time/text()
msgStatus,n1:isfDataset/msgStatus/state/text()
msgContext,n1:isfDataset/msgContext/context/projRef/id/te...
msgRelation,n1:isfDataset/relatedMsg/relType/code/text()
relatedMsg,n1:isfDataset/relatedMsg/msgRef/msgId/id/text()
msgSenderCode,n1:isfDataset/msgPty[1]/ptyType/code/text()


In [237]:
file_header='''
<n1:isfDataset crud="I" xsi:schemaLocation="http://www.asd-europe.org/s-series/s5000f ../00_XSD_Version_2.0/s5000f_2-0_isfdataset.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:n1="http://www.asd-europe.org/s-series/s5000f"></n1:isfDataset>'''

## READ HEADER

In [332]:
class Read_Header():
    '''
    Get metadata information of a S5000F message
    Inputs: 
        ** xmlfile: pathname of S5000F xml message file
        ** header_df is a pandas dataframe having
             information name as index, 
             column 'xpath' locating this information in xml message
    Output: 
        ** info_df is a pandas dataframe having:
             information name as index,
             column 'value' of information which is located by xpath
    '''
    def __init__(self, xmlfile, header_df):
        self._header = header_df
        self._info = {}
        
        with open(xmlfile, 'r') as _fd:
            _tree = etree.parse(_fd)
            _root = _tree.getroot()
            for index, xpath in self._header.items():
                val = _root.xpath("../"+xpath,namespaces= nmsp)
                if val:
                    self._info.update({index: val[0]})
                else:
                    self._info.update({index: ''})
                    
                    
        info_df = pd.DataFrame.from_dict(self._info,orient='index',columns=['value'])
        print(info_df)
        #info_df.index.name = 'name'
        #return info_df
        #self._info_df = pd.Series(self._info).to_frame().set_index('id')
        #print(self._info_df)
        #self._xpath_df = self._header.to_frame().set_index('key')
        #self._df = self.info_df.join(self._xpath_df)
        #self._df = value_series.to_frame().join(self._header.to_frame())
        #print(value_series)

    def __str__(self):
        #print(self._series)
        #print(self._xpath)
        #print(self._df)
        pass

In [333]:
# Get an XML file to test Read_Header
import glob
import os
list_of_input_files = glob.glob('../Input_folder/*.xml')   # get xml input files to be processed

xmlfile = max(list_of_input_files, key=os.path.getctime)   # pick the oldest one

pathname,filename = os.path.split(xmlfile)
print(f"'{filename}'")
print()

'msg1234567890.xml'



In [334]:
aa = Read_Header(xmlfile,xpath_series)

                                                             value
msgUid                                               msg1234567890
msgId            BIKE TREK AIX-EN-PROVENCE ON 2020-02-25T06:27:...
msgCode                                                    UC50902
msgDate                                                 2020-03-13
msgTime                                                16:00:13.0Z
msgStatus                                                        F
msgContext                                                        
msgRelation                                                       
relatedMsg                                                        
msgSenderCode                                                    S
msgSenderId                                     Guillaume Ollivier
msgReceiverCode                                                  R
msgReceiver                                          Bernard Raust
msgRemark                                                     

## WRITE HEADER

In [312]:
# Search a path (like 'Item/Purchases/Purchase[@Country="CA"][@Language="ES"]/SDPrice')
# to get tag name and dictionary of attributes

TAG_REGEX = r"(?P<tag>\w+)"                         # capture group named tag. It is tag name
CONDITION_REGEX = r"(?P<condition>(?:\[.*?\])*)"    # capture group named condition. It is attributes
STEP_REGEX = TAG_REGEX + CONDITION_REGEX
ATTR_REGEX = r"@(?P<key>\w+)=\"(?P<value>.*?)\""

search_step = re.compile(STEP_REGEX, flags=re.DOTALL).search
findall_attr = re.compile(ATTR_REGEX, flags=re.DOTALL).findall

def parse_step(step):
    mo = search_step(step)
    if mo:
        tag = mo.group("tag")
        condition = mo.group("condition")
    return tag, dict(findall_attr(condition))
    raise ValueError(xpath)

In [91]:
class Write_Header():
    '''
    Build header and trailer S5000F message
    Inputs:        
        ** header/trailer info as a dictionary {key: 'information name' value: 'value in xml message'}
        ** params is a dictionary {key: 'information name' value: 'xpath in xml message'}
    Output: 
        ** filepath path of S5000F message xml file
    '''
    def __init__(self, info, params):
        # create a dataframe to match dictionaries information and pathname
        df = pd.DataFrame.from_dict(params,orient='index',columns=['Path'])
        df = df.join(pd.DataFrame.from_dict(info,orient='index',columns=['Value']))
        df1 = df.dropna()
        for row in df1.itertuples():
            path = row.Path
            parts = path.split("/")
            xpath_list = parts[1:]  # 1st pathname elt not processed because root elt already created  
            curr = root
            for xpath in xpath_list:
                tag_name, attrs = parse_step(xpath)
                nodes = curr.xpath(xpath,namespaces = nmsp)
                if nodes:
                    curr = nodes[0]
                else:
                    curr = etree.SubElement(curr, tag_name, **attrs)
        if curr.text:
            curr = etree.SubElement(curr.getparent(), curr.tag, **curr.attrib)
        curr.text = row.Value

        print(df1)
        print(etree.tostring(root, pretty_print=True).decode())

In [92]:
xsd=etree.fromstring(file_header)   # create an element xsd from string 'file_header'
message = etree.ElementTree(xsd)    # create a document tree 'doc' by inserting xsd as element 
root = message.getroot()            # get root element

## Test of Read_Header and Write_Header classes
Based on common dictionary (information name, information xpath, information value)<ol><li>process an XML message to get for each 'information name' located by its 'xpath' the "information value"<li>write an XML message using the previous dictionary<li>compare 'in-going' and 'out-going' XML messages

In [93]:
import glob
import os
list_of_input_files = glob.glob('../Input_folder/*.xml')   # get xml input files to be processed

xmlfile = max(list_of_input_files, key=os.path.getctime)   # pick the oldest one

pathname,filename = os.path.split(xmlfile)
print(f"'{filename}'")
print()

'msg1234567890.xml'



In [94]:
a = Read_Header(xmlfile,info_map)
a

Pandas(Index='msgId', Xpath='n1:isfDataset/msgId/id', Value='')


AttributeError: can't set attribute

In [83]:
b = Write_Header(a.info,header_dict)

NameError: name 'a' is not defined

In [84]:
# Define pos_hash function which return a positive hash number
# For removing random seed which is set at each Python runtime, set-up PYTHONHASHSEED value 
# %env PYTHONHASHSEED=19531130

import sys
def pos_hash(s):
    h=hash(s)
    if h < 0:
        h += sys.maxsize
    return(str(h))

In [85]:
import hashlib
def _create_uid():
    '''message uid is msg+ hashcode(message id)'''
    hash_string='BIKE TREK AIX-EN-PROVENCE ON 2020-02-25T06:27:35.000Z msg4762705562253142615'
    return 'msg'+pos_hash(hash_string)

In [None]:
a = _create_uid()
print(a)

In [None]:
class Header_Trailer_Reader:
    def __init__(self, path):
        try:
            with open(path, 'rb') as fd:
                self.dic = xmltodict.parse(fd.read().decode('utf8'))
            self.__path = path
            self.__patientInfoNode = self.dic['sapphire']['dcarRecord']['patientInfo']
            self.__ecgNode = self.__patientInfoNode['visit']['order']['ecgResting']['params']['ecg']['wav']['ecgWaveformMXG']
            self.header = self.__makeHeaderDic()
            self.__makeDataArray()
            self.__makeStructuredArray()

        except Exception: 
            print(traceback.print_exc())
# 
    
    def __makeHeaderDic(self):
        patientInfo = self.__patientInfoHeader()
        deviceInfo = self.__deviceInfoHeader()
        acquisitionInfo = self.__aquisitionInfoHeader()
        return {'PatientInfo': patientInfo, 'DeviceInfo': deviceInfo, 'AcquisitionInfo': acquisitionInfo}


    def __patientInfoHeader(self):
        if('unknownID' in self.__patientInfoNode.keys() or 'name' not in self.__patientInfoNode.keys()):
            given_name = 'Unknown'
            family_name = 'Unknown'
            id = 'Unknown'
        else:
            given_name = self.__patientInfoNode['name']['given']['@V']
            family_name = self.__patientInfoNode['name']['family']['@V']
            id = self.__patientInfoNode['identifier']['id']['@V']
        gender = self.__patientInfoNode['gender']['@V']
        race = self.__patientInfoNode['raceCode']['@V']
        pacemaker = self.__patientInfoNode['visit']['order']['testInfo']['hasPacemaker']['@V']
        return {'Given_Name': given_name, 'Family_Name': family_name, 'ID': id, 'Gender': gender, 'Race': race, 'Pacemaker': pacemaker}

    
    def __deviceInfoHeader(self):
        deviceModel = self.__patientInfoNode['visit']['order']['device']['modelID']['@V']
        deviceName = self.__patientInfoNode['visit']['order']['device']['deviceName']['@V']
        deviceSerial = self.__patientInfoNode['visit']['order']['device']['serialID']['@V']
        return {'DeviceModel': deviceModel, 'DeviceName': deviceName, 'DeviceSerial': deviceSerial}

    
    def __aquisitionInfoHeader(self):
        acquisitionDate = self.__patientInfoNode['visit']['order']['testInfo']['acquisitionDateTime']['@V']
        LeadAmplitudeUnitsPerBit = self.__ecgNode['@S']
        LeadAmplitudeUnits = self.__ecgNode['@U']
        Res = self.__ecgNode['@INV']
        filters = self.__getFilterInfo()
        sampleRate = {'SampleRate': self.__ecgNode['sampleRate']['@V'], 'Units': self.__ecgNode['sampleRate']['@U']}
        leadsInformation = self.__getLeadInfo()
        return {'Resolution': Res, 'AcquisitionDate': acquisitionDate, 'LeadAmplitudeUnitsPerBit': LeadAmplitudeUnitsPerBit, 'LeadAmplitudeUnits': LeadAmplitudeUnits, 'Filters': filters, 'SampleRate': sampleRate, 'LeadsInformation': leadsInformation}

    
    def __getFilterInfo(self):
        highPassNode = self.__ecgNode['filters']['highPass']
        highPass = {'Frequency': highPassNode['frequency']['@V'], 'Units': highPassNode['frequency']['@U'], 'Order': highPassNode['order']['@V']}
        LowPassNode = self.__ecgNode['filters']['lowPass']
        lowPass = {'Frequency': LowPassNode['frequency']['@V'], 'Units': LowPassNode['frequency']['@U'], 'Order': LowPassNode['order']['@V']}
        algorithms = []
        algorithmsNodes = self.__ecgNode['filters']['algorithm']
        for i in algorithmsNodes:
            if(i == 'name'):
                algorithms.append({'Name': algorithmsNodes['name']['@V'], 'Purpose': algorithmsNodes['purpose']['@V']})
                break
            else:
                algorithms.append({'Name': i['name']['@V'], 'Purpose': i['purpose']['@V']})

        return {'HighPass': highPass, 'LowPass': lowPass, 'Algorithms': algorithms}


    def __getLeadInfo(self):
        leadsNames = []
        leadsLabels = []
        for i in self.__ecgNode['ecgWaveform']:
            leadsNames.append(i['@lead'])
            leadsLabels.append(i['@label'])
            self.__numberOfSamples = i['@asizeVT']
        self.__leadsNames = leadsNames
        return {'LeadsNames': leadsNames, 'LeadsLabels': leadsLabels, 'NumberOfSamples': self.__numberOfSamples}


    def __makeDataArray(self):
        self.dataArray = np.zeros((int(self.__numberOfSamples), len(self.__leadsNames)), dtype=int)
        for i in range(0, len(self.__ecgNode['ecgWaveform'])):
            self.dataArray[:, i] = list(map(int, self.__ecgNode['ecgWaveform'][i]['@V'].split(' ')))


    def __makeStructuredArray(self):
        self.dataObject = {}
        for i in range(0, len(self.__ecgNode['ecgWaveform'])):
            self.dataObject[self.__leadsNames[i]] = self.dataArray[:, i]
        
        self.dataFrame = pd.DataFrame(self.dataObject)
        
        self.__data_string = self.dataFrame.to_string(header=False)
        self.__data_string = re.sub(' +',',', self.__data_string)
        self.__header_string = 'nSeq '
        self.__header_string += reduce((lambda x, y: x + ' ' + y), self.__leadsNames)
        self.header['AcquisitionInfo']['HeaderString'] = self.__header_string
    def getLead(self, lead):
        return self.dataFrame[[lead]]


    def __makeOSHeader(self):
        self.__OSHeader = {'00:00:00:00:00:00': {}}
        self.__OSHeader['00:00:00:00:00:00']['sensor'] = ['RAW'] * len(self.__ecgNode['ecgWaveform'])
        self.__OSHeader['00:00:00:00:00:00']['device name'] = self.header['DeviceInfo']['DeviceName']
        self.__OSHeader['00:00:00:00:00:00']['column'] = self.__header_string.split(' ')
        self.__OSHeader['00:00:00:00:00:00']['sync interval'] = 0
        self.__OSHeader['00:00:00:00:00:00']['time'] = (self.header['AcquisitionInfo']['AcquisitionDate'].split('T')[1]+'0').strip()
        self.__OSHeader['00:00:00:00:00:00']['date'] = (self.header['AcquisitionInfo']['AcquisitionDate'].split('T')[0]).strip()
        self.__OSHeader['00:00:00:00:00:00']['comments'] = ''
        self.__OSHeader['00:00:00:00:00:00']['device connection'] = 'BTH00:00:00:00:00:00'
        self.__OSHeader['00:00:00:00:00:00']['channels'] = list(range(1, 1+len(self.__ecgNode['ecgWaveform'])))
        self.__OSHeader['00:00:00:00:00:00']['mode'] = 0
        self.__OSHeader['00:00:00:00:00:00']['digital IO'] = []
        self.__OSHeader['00:00:00:00:00:00']['firmware version'] = 770
        self.__OSHeader['00:00:00:00:00:00']['device'] = 'virtual_plux'
        self.__OSHeader['00:00:00:00:00:00']['position'] = 0
        self.__OSHeader['00:00:00:00:00:00']['sampling rate'] = int(self.header['AcquisitionInfo']['SampleRate']['SampleRate'])
        self.__OSHeader['00:00:00:00:00:00']['label'] = self.__leadsNames
        self.__OSHeader['00:00:00:00:00:00']['resolution'] = [int(self.header['AcquisitionInfo']['Resolution']).bit_length()] * len(self.__ecgNode['ecgWaveform'])
        self.__OSHeader['00:00:00:00:00:00']['special'] = [{}, {}, {}, {}, {}]
        return json.dumps(self.__OSHeader)

    def saveHeader(self, filename):
        temp = open('.{}{}_header.json'.format(os.sep, filename), 'w')
        temp.write(json.dumps(self.header))
        temp.close()


    def saveToCSV(self, filename=None):
        if(filename==None):
            filename = 'GEMuseXML' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
        temp = open('.{}{}.csv'.format(os.sep, filename), 'w')
        temp.write('# ' + self.__header_string + '\n')
        temp.write(self.__data_string)
        temp.close()
    

    def saveToPandasCSV(self, filename=None, header=True):
        if(filename==None):
            filename = 'GEMuseXML' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
        self.dataFrame.to_csv('.{}{}_pandas.csv'.format(os.sep, filename))
        if(header):
            self.saveHeader(filename)


    def saveToJson(self, filename=None, header=True):
        if(filename==None):
            filename = 'GEMuseXML' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
        tempDic = {'Header': self.header, 'Data': {}}
        for i in range(0, len(self.__ecgNode['ecgWaveform'])):
            tempDic['Data'][self.__ecgNode['ecgWaveform'][i]['@lead']] = list(map(int, self.__ecgNode['ecgWaveform'][i]['@V'].split(' ')))
        temp = open('.{}{}.json'.format(os.sep, filename), 'w')
        temp.write(json.dumps(tempDic))
        temp.close()

    
    def saveToExcel(self, filename=None, header=True):
        if(filename==None):
            filename = 'GEMuseXML' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
        self.dataFrame.to_excel('.{}{}.xls'.format(os.sep, filename))
        if(header):
            self.saveHeader(filename)
    

    def saveNumpyArray(self, filename=None, header=True):
        if(filename==None):
            filename = 'GEMuseXML' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
        np.save('.{}{}.npy'.format(os.sep, filename), self.dataArray)
        if(header):
            self.saveHeader(filename)


    def saveToOPS(self, filename=None):
        if(filename==None):
            filename = 'GEMuseXML' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
        temp = open('.{}{}.txt'.format(os.sep, filename), 'w')
        temp.write('# OpenSignals Text File Format\n')
        temp.write('# ' + self.__makeOSHeader() + '\n')
        temp.write('# EndOfHeaders\n')
        temp.write(self.dataFrame.to_string(header=False))
        temp.close()


if __name__ == "__main__":

    def parseArgParser(file, arg, type):
        if(arg == ' '):
            filename = None
        else:
            filename = arg

        if(type == 'csv'):
            file.saveToCSV(filename)
        if(type == 'pcsv'):
            file.saveToPandasCSV(filename)
        elif(type == 'ops'):
            file.saveToOPS(filename)
        elif(type == 'json'):
            file.saveToJson(filename)
        elif(type == 'excel'):
            file.saveToExcel(filename)
        elif(type == 'numpy'):
            file.saveNumpyArray(filename)
        elif(type == 'all'):
            file.saveToCSV(filename)
            file.saveToPandasCSV(filename, False)
            file.saveToOPS(filename)
            file.saveToJson(filename, False)
            file.saveToExcel(filename, False)
            file.saveNumpyArray(filename)


    parser = argparse.ArgumentParser()
    parser.add_argument('file', help="file path")
    parser.add_argument("-csv", help="convert to csv", nargs='?', const=' ')
    parser.add_argument("-pcsv", help="convert to pandas csv", nargs='?', const=' ')
    parser.add_argument("-ops", help="convert to opensignals formated txt", nargs='?', const=' ')
    parser.add_argument("-x", '--excel', help="convert to excel", nargs='?', const=' ')
    parser.add_argument("-np", '--numpy', help="convert to numpy", nargs='?', const=' ')
    parser.add_argument("-json", help="convert to json", nargs='?', const=' ')
    parser.add_argument("-all", help="convert to csv, excel, numpy and json", nargs='?', const=' ')
    args = parser.parse_args()

    file = GEMuseXMLReader(args.file)

    if args.csv:
        parseArgParser(file, args.csv, 'csv')
    
    if args.pcsv:
        parseArgParser(file, args.pcsv, 'pcsv')

    if args.ops:
        parseArgParser(file, args.ops, 'ops')
    
    if args.excel:
        parseArgParser(file, args.excel, 'excel')

    if args.numpy:
        parseArgParser(file, args.numpy, 'numpy')

    if args.json:
        parseArgParser(file, args.json, 'json')

    if args.all:
        parseArgParser(file, args.all, 'all')
