# OPUS dei


The program to convert all the Bruker OPUS Files containing IR absorbance spectra from an specific folder into data files.

### Changelog

1.0 - Original program  
1.1 - Includes function for avoid than a corrupt file stop the whole process  
2.0 - New system to save memory space  
2.1 - Solves some minor bugs  
2.2 - Solves a minor bug with mzz files
2.3 - Loads files from subfolders

### How to use it:

If you are using jupyter, just go to `Cell`, press `Run All`, and then indicate the location of the folder with the OPUS files

### Enjoy!

First we import the packages that we are going to use

In [1]:
import numpy as np
import struct
import os
import zipfile
import zlib

Then, we indicate where the spectra are. To do this, the easiest way to proceed is just going to the folder where the spectra are, right-click on it, select properties, and, then, copy the text at the right of "Location:". Remember to add / at the end if you are using UNIX and \ if you are using windows

In [2]:
foldername = input("Please, indicate the folder location: ")

Please, indicate the folder location: C:\Users\Mario\Desktop\Francesco\


Now here is the code for the opus file reader

In [3]:
class OpusReader(dict):
    def __init__(self, filename):
       
        self.opusFile = open(filename, 'rb')
        self.data = self.opusFile.read()
        self.opusFile.close()

        self.Nd = len(self.data)

    
        self.readHeader()

        self.dataBlockList = []
        self.parameterList = []

    def readHeader(self):
        Nh = 504
        self.header = self.data[0:Nh]

        self.offsetList = []
        self.chunkSizeList = []
        self.typeList = []
        self.channelList = []
        self.textList = []

        # cursor = 44
        cursor = 32
        
        while cursor > 0:
            i1 = cursor
            i2 = i1 + 4

            if i2 <= Nh:
                #%% read offset
                offset = struct.unpack('<I', self.header[i1:i2])[0]
                
                if offset > 0:
                    self.offsetList.append(offset)
                    
                    #%% read chunk size [4 bytes]
                    i1 = cursor - 4
                    i2 = i1 + 4
                    chunkSize = struct.unpack('<I', self.header[i1:i2])[0]
                    self.chunkSizeList.append(chunkSize)
                    
                    #%% read data type
                    i1 = cursor - 8
                    i2 = i1 + 1
                    value = struct.unpack('<B', self.header[i1:i2])[0]
                    self.typeList.append(value)

                    #%% read channel type
                    i1 = cursor - 7
                    i2 = i1 + 1
                    value = struct.unpack('<B', self.header[i1:i2])[0]
                    self.channelList.append(value)

                    #%% read text type
                    i1 = cursor - 6
                    i2 = i1 + 1
                    value = struct.unpack('<B', self.header[i1:i2])[0]
                    self.textList.append(value)

                    nextOffset = offset + 4 * chunkSize
                    
                    if nextOffset >= self.Nd:
                        # Next offset would reach EOF
                        cursor = -1
                    else:
                        cursor += 12
                else:
                    cursor = -1
            else:
                cursor = -1


    def readDataBlocks(self):
        Nb = len(self.offsetList)
        
        for iBlock in range(Nb):
            chunk = self.readChunk(iBlock)
            chunkSize = self.chunkSizeList[iBlock]
            blockType = self.typeList[iBlock]
            textType = self.textList[iBlock]
            channel = self.channelList[iBlock]
            
            dataBlock = DataBlock(
                chunk=chunk, chunkSize=chunkSize,
                blockType=blockType, textType=textType)
            
            self.dataBlockList.append(dataBlock)

            dataBlockName = None

            if blockType == 0:
                if textType == 8:
                    dataBlockName = 'Info Block'
                elif textType == 104:
                    dataBlockName = 'History'
                elif textType == 152:
                    dataBlockName = 'Curve Fit'
                elif textType == 168:
                    dataBlockName = 'Signature'
                elif textType == 240:
                    dataBlockName = 'Integration Method'
                else:
                    dataBlockName = 'Text Information'
                    
                self[dataBlockName] = dataBlock

            elif blockType == 7:
                if channel == 4:
                    self['ScSm'] = np.array(dataBlock.values)
                elif channel == 8:
                    self['IgSm'] = np.array(dataBlock.values)
                elif channel == 12:
                    self['PhSm'] = np.array(dataBlock.values)

            elif blockType == 11:
                if channel == 4:
                    self['ScRf'] = np.array(dataBlock.values)
                elif channel == 8:
                    self['IgRf'] = np.array(dataBlock.values)

            elif blockType == 15:
                self['AB'] = np.array(dataBlock.values)

            elif blockType == 23:
                if channel == 4:
                    dataBlockName = 'ScSm Data Parameter'
                elif channel == 8:
                    dataBlockName = 'IgSm Data Parameter'
                elif channel == 12:
                    dataBlockName = 'PhSm Data Parameter'

                self[dataBlockName] = dataBlock

            elif blockType == 27:
                if channel == 4:
                    dataBlockName = 'ScRf Data Parameter'
                elif channel == 8:
                    dataBlockName = 'IgRf Data Parameter'
                    
                self[dataBlockName] = dataBlock

            elif blockType == 31:
                dataBlockName = 'AB Data Parameter'
                self[dataBlockName] = dataBlock

            elif blockType == 32:
                dataBlockName = 'Instrument'
                self[dataBlockName] = dataBlock

            elif blockType == 40:
                dataBlockName = 'Instrument (Rf)'
                self[dataBlockName] = dataBlock

            elif blockType == 48:
                dataBlockName = 'Acquisition'
                self[dataBlockName] = dataBlock

            elif blockType == 56:
                dataBlockName = 'Acquisition (Rf)'
                self[dataBlockName] = dataBlock

            elif blockType == 64:
                dataBlockName = 'Fourier Transformation'
                self[dataBlockName] = dataBlock

            elif blockType == 72:
                dataBlockName = 'Fourier Transformation (Rf)'
                self[dataBlockName] = dataBlock

            elif blockType == 96:
                dataBlockName = 'Optik'
                self[dataBlockName] = dataBlock

            elif blockType == 104:
                dataBlockName = 'Optik (Rf)'
                self[dataBlockName] = dataBlock

            elif blockType == 160:
                dataBlockName = 'Sample'
                self[dataBlockName] = dataBlock

            else:
                print("Error")

            if dataBlockName is not None:                
                parameter = {'name': dataBlockName, 'type': 'group'}
                parameter['children'] = dataBlock.parameterList
                
                self.parameterList.append(parameter)

        if 'AB Data Parameter' in self.keys():
            fxv = self['AB Data Parameter']['FXV']
            lxv = self['AB Data Parameter']['LXV']
            npt = self['AB Data Parameter']['NPT']
            self['WN'] = np.linspace(fxv, lxv, npt)
    
            # self.AB = Absorption(wavenumber = wavenumber, AB = self['AB'])
        
    def readChunk(self, iBlock):
        i1 = self.offsetList[iBlock]
        i2 = i1 + 4 * self.chunkSizeList[iBlock]
        
        chunk = self.data[i1:i2]
        
        return chunk

###############################################################################
class DataBlock(dict):
    def __init__(self, **kwargs):

        self.textType = -1
        
        for key, value in kwargs.items():
            if key == "chunk":
                self.chunk = value
            elif key == "chunkSize":
                self.chunkSize = value
            elif key == "blockType":
                self.blockType = value
            elif key == "blockType":
                self.blockType = value
            elif key == "textType":
                self.textType = value

        self.parameterList = []

        self.readChunk()


    def readChunk(self):
        if self.blockType == 0:
            if self.textType == 8:
                # INFO
                self.readParameter()
            else:
                # datafile history
                self.readText()
        elif self.blockType == 7:
            # ScSm
            self.readData()
        elif self.blockType == 11:
            # ScRf
            self.readData()
        elif self.blockType == 15:
            # AB
            self.readData()
        elif self.blockType in [23, 27, 31, 32, 40, 48, 64, 96, 104, 160]:
            self.readParameter()
        else:
            self.readParameter()
            
    
    def readParameter(self):
        cursor = 0
        parameterName = ''
        self.parameterTypes = ['int', 'float', 'str', 'str', 'str']

        while cursor >= 0:        
            i1 = cursor
            i2 = i1 + 3

            try:
                parameterName = self.chunk[i1:i2].decode("utf-8")
            except:
                print("Error")

            if parameterName == 'END':
                cursor = -1
                return


            # read parameter type
            i1 = cursor + 4
            i2 = i1 + 2
            typeIndex = struct.unpack('<H', self.chunk[i1:i2])[0]

            try:
                parameterType = self.parameterTypes[typeIndex]
            except IndexError:
                print("Error")
    
            # read parameter size
            i1 = cursor + 6
            i2 = i1 + 2
            parameterSize = struct.unpack('<H', self.chunk[i1:i2])[0]
    
            # read value
            i1 = cursor + 8
            i2 = i1 + 2 * parameterSize
            value = self.chunk[i1:i2]

            if typeIndex == 0:
                parameterValue = struct.unpack('<i', value)[0]
            elif typeIndex == 1:
                # unpack little-endinan double
                parameterValue = struct.unpack('<d', value)[0]
            elif typeIndex == 2:
                iEnd = value.find(b'\x00')
                parameterValue = value[:iEnd].decode("latin-1")
            elif typeIndex == 3:
                iEnd = value.find(b'\x00')
                parameterValue = value[:iEnd].decode("latin-1")
            elif typeIndex == 4:
                iEnd = value.find(b'\x00')
                parameterValue = value[:iEnd].decode("latin-1")
                
            else:
                parameterValue = value

            self[parameterName] = parameterValue

            parameter = {}
            parameter['name'] = parameterName
            parameter['value'] = parameterValue
            parameter['type'] = parameterType
            self.parameterList.append(parameter)
   
            cursor = cursor + 8 + 2 * parameterSize


    def readData(self):
        fmt = '<' + str(self.chunkSize) + 'f'
        self.values = struct.unpack(fmt, self.chunk)

    def readText(self):
        self.text = self.chunk.decode('latin-1')

    def readInfo(self):
        print('Info')
        
        cursor = 0
        parameterName = ''
        self.parameterTypes = ['int', 'float', 'str', 'str', 'str']
        
        while cursor >= 0:
            i1 = cursor
            i2 = i1 + 3
            
            try:
                parameterName = self.chunk[i1:i2].decode("utf-8")
            except:
                print("Error")

            if parameterName == 'END':
                cursor = -1
                return

            # read parameter type
            i1 = cursor + 4
            i2 = i1 + 2
            typeIndex = struct.unpack('<H', self.chunk[i1:i2])[0]

            try:
                parameterType = self.parameterTypes[typeIndex]
            except IndexError:
                print("Error")

            # read parameter size
            i1 = cursor + 6
            i2 = i1 + 2
            parameterSize = struct.unpack('<H', self.chunk[i1:i2])[0]
    
            # read value
            i1 = cursor + 8
            i2 = i1 + 2 * parameterSize
            value = self.chunk[i1:i2]

            if typeIndex == 0:
                parameterValue = struct.unpack('<i', value)[0]
            elif typeIndex == 1:
                # unpack little-endinan double
                parameterValue = struct.unpack('<d', value)[0]
            elif typeIndex == 2:
                iEnd = value.find(b'\x00')
                parameterValue = value[:iEnd].decode("latin-1")
            elif typeIndex == 3:
                iEnd = value.find(b'\x00')
                parameterValue = value[:iEnd].decode("latin-1")
            elif typeIndex == 4:
                iEnd = value.find(b'\x00')
                parameterValue = value[:iEnd].decode("latin-1")
                
            else:
                parameterValue = value

            self[parameterName] = parameterValue

            parameter = {}
            parameter['name'] = parameterName
            parameter['value'] = parameterValue
            parameter['type'] = parameterType
            self.parameterList.append(parameter)

            cursor = cursor + 8 + 2 * parameterSize
                
                
            # cursor = -1
            
            print(parameterName, parameterType, parameterValue)

Now, we look for all the opus files in the folder:

In [20]:
filenames = []
for root, dirs, files in os.walk(foldername):
    for file in files:
        tmp = os.path.join(root,file)
        if tmp[-1].isdigit():
            filenames.append(tmp)

Now we extract the spectra from the binary files and export them in a data text file.

In [22]:
for i in filenames:
    # Now we translate the binary data of the file:
    filename = os.path.basename(i)
    
    try:
        binarynightmare = OpusReader(i)
        binarynightmare.readDataBlocks()
    
        # We extract the absorption spectrum from the file
        y = binarynightmare["AB"]
    except:
        continue
            
    # And calculate the wavenumbers
        
    start = binarynightmare["AB Data Parameter"]["FXV"]
    end = binarynightmare["AB Data Parameter"]["LXV"]
    ls = -(start-end)/len(y)

    x = np.arange(start,end,ls)

    # The spectrum together is
    spectrum = np.array([x,y])
    spectrum = np.transpose(spectrum)
        
    # And we save the file
    np.savetxt(i +".dpt", spectrum, fmt="%10.5f", delimiter="\t")
    
    # Here is the new algortithm to export compressed spectra.
    # The resolution is going to be one wavenumber. So we just round the wavenumbers of the spectrum
    rounded_wvn = []
    for j in spectrum:
        rounded_wvn.append([int(round(j[0])),j[1]])
    # And then, just look for the last point of each wavenumber:
    short_list = []
    for j in range(len(rounded_wvn)-1):
        if rounded_wvn[j][0] != rounded_wvn[j+1][0]:
            short_list.append(rounded_wvn[j])
    # And because the last item of the list never is caught, we add it manually:
    short_list.append(rounded_wvn[-1])
    # Now we have our list ready, so we only have to export it. To save space the list will not contain
    # the wavenumbers. So:
    exp_list = [short_list[0][0],short_list[-1][0],len(short_list)]
    for k in short_list:
        exp_list.append(round(k[1],4))
    np.savetxt(filename +".tmp", exp_list,  fmt="%10.4f", delimiter="\t")
    # And now, we zip-compress the file to save space:
    zf = zipfile.ZipFile(i +".mzz", mode='w')
    zf.write(filename +".tmp", compress_type=zipfile.ZIP_DEFLATED)
    zf.close()
    os.remove(filename +".tmp")
 