In [421]:
pip install bitstring

Note: you may need to restart the kernel to use updated packages.


In [422]:
# Import libraries
import numpy as np
import pandas as pd
import time
from time import strftime
from time import gmtime
import bitstring
from bitstring import BitArray

In [434]:
# Define repo 
root = 'C:/Users/Consultant/Desktop/Data/'
repoData = 'C:/Users/Consultant/Desktop/Data/'
repoCon = root + 'NAT/'
filename = 'z5700015________zr_57_83015_____dcu4____________dcucbmdata______190415_034515.bin' 

In [424]:
# Load table
IDStation = 0

#Load data
f = open(repoData + filename,"rb")

In [425]:
# Read Data and Store Binary Data
strbyte = f.read()
f.close()

In [426]:
#Used in cycling_write_out
def find_all(s, c):
    index_list = []
    idx = s.find(c)
    index_list.append(idx)
    
    while idx != -1:        
        idx = s.find(c, idx + 1)
        index_list.append(idx)

    return index_list[:-1]

In [427]:
### Change format from 0bxxx (with  mask) to 00010100 (without mask)
def octetfull(s):

    buffer = '0'*(8-(len(s)-2))
    return buffer+s[2:]

In [428]:
# Based on table 3.3 of documentation. return the number of byte need for a specific type of data.
def switchtype(x):
    return {
        0: 1,
        1: 1,
        2: 8,
        3: 8,
        4: 16,
        5: 16,
        6: 32,
        7: 32,
        8: 64,
        9: 64,
        10: 32,
        11: 64,
    }[x]

In [435]:
# Class which initializes and structures the binary data (Input : BinFile, Output : Understable Data )
class InputData:    
    
    # Splits Data in 3 Blocks : Header, Body and Data
    def __init__(self, strbyte):
        self.header = strbyte[0:26]
        self.body = strbyte[64:2216]
        self.footer = strbyte[4160:-1]
       
    def header_decode(self):
        self.header_info = []
        #assign nth (byte -> int) to an attribute 
        self.cbm_version = int.from_bytes(self.header[0:2], byteorder='little', signed=False)
        self.header_info.append(['Version du CBM', self.cbm_version])
        self.nb_oct_allocated_IOs = int.from_bytes(self.header[2:3], byteorder='little', signed=False)
        self.header_info.append(['Nombre Octets Alloués pour les IOs', self.nb_oct_allocated_IOs])
        self.nb_oct_allocated_analogs = int.from_bytes(self.header[3:4], byteorder='little', signed=False)
        self.header_info.append(['Nombre Octets Alloués pour les Analogs', self.nb_oct_allocated_analogs])
        self.nb_vars_echantillon = int.from_bytes(self.header[4:6], byteorder='little', signed=False)
        self.header_info.append(['Nombre de Variables Echantillons', self.nb_vars_echantillon])
        self.nb_vars_EnteteSpec = int.from_bytes(self.header[6:8], byteorder='little', signed=False)
        self.header_info.append(['Nombre de Variables Entêtes Specifiques', self.nb_vars_EnteteSpec])
        self.cycle_writing_area_size = int.from_bytes(self.header[8:12], byteorder='little', signed=False)
        self.header_info.append(['Taille de la zone de data (octet)', self.cycle_writing_area_size])
        self.nb_writing = int.from_bytes(self.header[12:16], byteorder='little', signed=False)
        self.header_info.append(['Nombre Enregistrements', self.nb_writing])
        self.cbm_start_time = int.from_bytes(self.header[16:24], byteorder='little', signed=False)
        self.header_info.append(['Temps de démarrage (s)', self.cbm_start_time])
        self.cbm_acquisition_time = int.from_bytes(self.header[24:26], byteorder='little', signed=False)
        self.header_info.append(['Temps entre deux échantillons (ms)', self.cbm_acquisition_time])
    
    def body_decode(self):
        self.sample_variable = []
        self.spec_header_variable = []
        
        #\xca are delimiter for each sample variable
        body_list = self.body.split(b'\xca')
        
        #with this operation, we have a list of byte array [sample variable 1, sample variable 2 ... , block of spec_header variable, empty byte array] )
        #The delimiter of each element of spec_header is \xfe, again, the last element of the string is a empty byte array. 
        #Therefore, we dont considere the last element (-2 instead of -1)
        
        body_list_sample = body_list[:-2] 
        body_list_header = body_list[-2].split(b'\xfe')[:-1] 
        
        
        self.datasize = []
        self.data_oct_position = []
        self.data_byte_position = []
        
        for element in body_list_sample:
            subframe = []
            subframe.append(['size',element[0]])
            subframe.append(['name',element[1:-3].decode('latin-1')])
            subframe.append(['type',element[-3]])
            subframe.append(['bitposition',element[-2]])
            subframe.append(['sampleposition',element[-1]])
            
            if element[-1] < self.nb_oct_allocated_IOs:
                subframe.append(['signal_type','IO'])
            elif element[-1] < (self.nb_oct_allocated_analogs + self.nb_oct_allocated_IOs):
                subframe.append(['signal_type','Analog'])
            else:
                subframe.append(['signal_type','File error'])
            
            self.datasize.append(switchtype(element[-3]))
            self.data_oct_position.append(element[-1])
            self.data_byte_position.append(element[-2])
            
            self.sample_variable.append(subframe)
            
        for element in body_list_header:
            subframe = []
            subframe.append(['size',element[0]])
            
            #the name of each variable end with \x00
            index = element.find(b'\x00')
            subframe.append(['name',element[1:index+1].decode('latin-1')])
            subframe.append(['type',element[index+1]])
            subframe.append(['sizevalue',element[index+2]])
            subframe.append(['value',element[index+3:index+3 + element[index+2]]])
            
            self.spec_header_variable.append(subframe)

            
    def footer_decode(self):
        self.measurement_data = []
        footer_size = len(self.footer)
        
        
        #each frame has at least 5 character
        min_size_per_frame = 5
        
        index = 0
        while index < self.cycle_writing_area_size :
            
            frame = []
            subframe = []
                                    
            frame.append(['U8Transition',hex(self.footer[index])[-2]])
            frame.append(['U8SleepMode',hex(self.footer[index])[-1]])
            ms = int.from_bytes(self.footer[index+1:index+4],byteorder='little', signed=False)
            s = ms / 1000
            timestamp = strftime("%H:%M:%S", gmtime(s))
            frame.append(['U24TimeStamp', timestamp])
            
            ### int -> bin -> str ---> filter passing the 7 last byte ---> count number of 1
            control_byte = bin(self.footer[index+4]) #defines control byte
            nb_control_byte = str(control_byte)[1:].count("1") #counts number of 1 in control byte to define auZipDBitData
            frame.append(['u8ZipControlBits',nb_control_byte, control_byte])
            
            print('Control Byte = ' + control_byte)
            
            buffer = '0'*8
            
            data_bit = ''
        
            flip_control_byte = control_byte[::-1]
            pointer = 0
            for i in range(len(flip_control_byte)-2):
                
                
                if flip_control_byte[i] == '0':
                
                    data_bit =  buffer + data_bit
                    
                else:
                    
                    ### I cant convert data directly to hex to binary, hence I do
                    ### hex -> integer -> binary  ---> count number of 1
                    data_oct = bin(int.from_bytes(data.footer[index+5+pointer:index+5+pointer+1],byteorder='little', signed=False))
                    data_oct = octetfull(data_oct)                
                    
                    pointer = pointer +1
                    data_bit = data_oct + data_bit
                                         
            print('ZipBitData = ' + data_bit)               
                  
            nb_data_bit = data_bit.count('1') #counts number of 1 in auZipBitData         
            frame.append(['auZipBitData',nb_data_bit,data_bit])
            
            subindex = 0
            while subindex < nb_data_bit :
                subframe_bin = octetfull(bin(self.footer[index+min_size_per_frame +nb_control_byte+subindex]))
                subframe.append([subframe_bin])
                subindex = subindex + 1
                
            frame.append(["measurement",subframe])
            self.measurement_data.append(frame)
            index = index + min_size_per_frame  +nb_control_byte +subindex
    

    def cycling_write_out(self):
        
        # we build an empty matrix for ios and analog data. 
        dataline = "0"*self.nb_oct_allocated_IOs*8 + "0"*self.nb_oct_allocated_analogs*8
        self.sample = pd.DataFrame(columns={'ID','Sample','Sub Sample'})
        self.sample_list = []
        self.id_list = []
        
        for i in range(self.nb_writing):
        
                #find all the position of 1 in the octets 
                #octet is read from right to left.
                index_list = find_all(self.measurement_data[i][4][-1][::-1], '1')
   
                for j in range(len(index_list)):
                    
                    ## data:  int -> bin -> fullbin -> flip because the bin is read from left to right
                    new_write = self.measurement_data[i][5][-1][j][0]
                    
                    ### rewrite the chosen octet
                    dataline = dataline[0:index_list[j]*8] + new_write[::-1] + dataline[index_list[j]*8+8:]
                    #print(self.measurement_data[i][4][1])
                    #print('Data = ' + dataline)
                
                self.id_list.append(i)
                self.sample_list.append(dataline)
                
        self.sample['ID'] = self.id_list
        self.sample['Sample'] = self.sample_list
        
        self.final_data = pd.DataFrame(columns={'TimeStamp'})
        timestamp = []
        ind = 0
        while ind < self.nb_writing :
            timestamp.append(self.measurement_data[:][ind][2][1])
            ind = ind + 1
        self.final_data['TimeStamp'] = timestamp
        
        var1 = []
        var2 = []
        var3 = []
        var4 = []
        var5 = []
        var6 = []
        var7 = []
        var8 = []
        var9 = []
        var10 = []
        var11 = []
        var12 = []
        var13 = []
        var14 = []
        var15 = []
        var16 = []
        var17 = []
        var18 = []
        var19 = []
        var20 = []
        var21 = []
        var22 = []
        var23 = []
        var24 = []
        var25 = []
        var26 = []
        var27 = []
        var28 = []
        var29 = []
        var30 = []
        var31 = []
        var32 = []
        var33 = []
        var34 = []
        var35 = []
        var36 = []
        var37 = []
        var38 = []
        var39 = []
        var40 = []
        var41 = []
        var42 = []
        var43 = []
        var44 = []
        var45 = []
        var46 = []
        var47 = []
        var48 = []
        var49 = []
        var50 = []
        var51 = []
        
        
        for k in range(len(self.sample_list)):
            
            var1.append(self.sample_list[k][0]) 
            var2.append(self.sample_list[k][1]) 
            var3.append(self.sample_list[k][2])
            var4.append(self.sample_list[k][3])
            var5.append(self.sample_list[k][4])
            var6.append(self.sample_list[k][5])
            var7.append(self.sample_list[k][6])
            var8.append(self.sample_list[k][7])
            var9.append(self.sample_list[k][8])
            var10.append(self.sample_list[k][9]) 
            var11.append(self.sample_list[k][10]) 
            var12.append(self.sample_list[k][11]) 
            var13.append(self.sample_list[k][12]) 
            var14.append(self.sample_list[k][13]) 
            var15.append(self.sample_list[k][14]) 
            var16.append(self.sample_list[k][15]) 
            var17.append(self.sample_list[k][16]) 
            var18.append(self.sample_list[k][17]) 
            var19.append(self.sample_list[k][18]) 
            var20.append(self.sample_list[k][19]) 
            var21.append(self.sample_list[k][20]) 
            var22.append(self.sample_list[k][21]) 
            var23.append(self.sample_list[k][22]) 
            var24.append(self.sample_list[k][23]) 
            var25.append(self.sample_list[k][24]) 
            var26.append(self.sample_list[k][25]) 
            var27.append(self.sample_list[k][26]) 
            var28.append(self.sample_list[k][27]) 
            var29.append(self.sample_list[k][28]) 
            var30.append(self.sample_list[k][29]) 
            var31.append(self.sample_list[k][30]) 
            var32.append(self.sample_list[k][31]) 
            var33.append(self.sample_list[k][32]) 
            var34.append(self.sample_list[k][33]) 
            var35.append(self.sample_list[k][34]) 
            var36.append(self.sample_list[k][35]) 
            var37.append(self.sample_list[k][36]) 
            var38.append(self.sample_list[k][37]) 
            var39.append(self.sample_list[k][38]) 
            var40.append(self.sample_list[k][39]) 
            var41.append(self.sample_list[k][40]) 
            var42.append(self.sample_list[k][41]) 
            var43.append(self.sample_list[k][42]) 
            var44.append(int(self.sample_list[k][43:48],2))
            cp= self.sample_list[k][48:120]
            courant_mot_porte = BitArray(bin=cp).int
            var45.append(courant_mot_porte) 
            tp= self.sample_list[k][120:136]
            tension_mot_porte = BitArray(bin=tp).int
            var46.append(tension_mot_porte)
            pp= self.sample_list[k][136:152]
            position_mot_porte = BitArray(bin=pp).int
            var47.append(position_mot_porte) 
            cm= self.sample_list[k][152:168]
            courant_mot_marche = BitArray(bin=cm).int
            var48.append(courant_mot_marche)
            tm= self.sample_list[k][168:184]
            tension_mot_marche = BitArray(bin=tm).int
            var49.append(tension_mot_marche)
            pm= self.sample_list[k][184:200]
            position_mot_marche = BitArray(bin=pm).int
            var50.append(position_mot_marche)
            
        index = 0
        while index < len(self.sample_variable):
            label = self.sample_variable[index][1][1].replace('\x00','')
            self.final_data[label] = 0
            index = index + 1
        
        #Set data in Final DataFrame
        self.final_data["LT-V-DVR est active"] = var1
        self.final_data[self.sample_variable[1][1][1].replace('\x00','')] = var2
        self.final_data[self.sample_variable[2][1][1].replace('\x00','')] = var3  
        self.final_data[self.sample_variable[3][1][1].replace('\x00','')] = var4
        self.final_data[self.sample_variable[4][1][1].replace('\x00','')] = var5
        self.final_data[self.sample_variable[5][1][1].replace('\x00','')] = var6
        self.final_data[self.sample_variable[6][1][1].replace('\x00','')] = var7
        self.final_data[self.sample_variable[7][1][1].replace('\x00','')] = var8  
        self.final_data[self.sample_variable[8][1][1].replace('\x00','')] = var9
        self.final_data[self.sample_variable[9][1][1].replace('\x00','')] = var10
        self.final_data[self.sample_variable[10][1][1].replace('\x00','')] = var11
        self.final_data[self.sample_variable[11][1][1].replace('\x00','')] = var12
        self.final_data[self.sample_variable[12][1][1].replace('\x00','')] = var13 
        self.final_data[self.sample_variable[13][1][1].replace('\x00','')] = var14
        self.final_data[self.sample_variable[14][1][1].replace('\x00','')] = var15
        self.final_data[self.sample_variable[15][1][1].replace('\x00','')] = var16
        self.final_data[self.sample_variable[16][1][1].replace('\x00','')] = var17
        self.final_data[self.sample_variable[17][1][1].replace('\x00','')] = var18  
        self.final_data[self.sample_variable[18][1][1].replace('\x00','')] = var19
        self.final_data[self.sample_variable[19][1][1].replace('\x00','')] = var20
        self.final_data[self.sample_variable[20][1][1].replace('\x00','')] = var21
        self.final_data[self.sample_variable[21][1][1].replace('\x00','')] = var22
        self.final_data[self.sample_variable[22][1][1].replace('\x00','')] = var23
        self.final_data[self.sample_variable[23][1][1].replace('\x00','')] = var24 
        self.final_data[self.sample_variable[24][1][1].replace('\x00','')] = var25
        self.final_data[self.sample_variable[25][1][1].replace('\x00','')] = var26
        self.final_data[self.sample_variable[26][1][1].replace('\x00','')] = var27
        self.final_data[self.sample_variable[27][1][1].replace('\x00','')] = var28
        self.final_data[self.sample_variable[28][1][1].replace('\x00','')] = var29  
        self.final_data[self.sample_variable[29][1][1].replace('\x00','')] = var30
        self.final_data[self.sample_variable[30][1][1].replace('\x00','')] = var31
        self.final_data[self.sample_variable[31][1][1].replace('\x00','')] = var32
        self.final_data[self.sample_variable[32][1][1].replace('\x00','')] = var33
        self.final_data[self.sample_variable[33][1][1].replace('\x00','')] = var34  
        self.final_data[self.sample_variable[34][1][1].replace('\x00','')] = var35
        self.final_data[self.sample_variable[35][1][1].replace('\x00','')] = var36
        self.final_data[self.sample_variable[36][1][1].replace('\x00','')] = var37
        self.final_data[self.sample_variable[37][1][1].replace('\x00','')] = var38
        self.final_data[self.sample_variable[38][1][1].replace('\x00','')] = var39  
        self.final_data[self.sample_variable[39][1][1].replace('\x00','')] = var40
        self.final_data[self.sample_variable[40][1][1].replace('\x00','')] = var41
        self.final_data[self.sample_variable[41][1][1].replace('\x00','')] = var42
        self.final_data[self.sample_variable[42][1][1].replace('\x00','')] = var43
        self.final_data[self.sample_variable[43][1][1].replace('\x00','')] = var44  
        self.final_data[self.sample_variable[44][1][1].replace('\x00','')] = var45
        self.final_data[self.sample_variable[45][1][1].replace('\x00','')] = var46
        self.final_data[self.sample_variable[46][1][1].replace('\x00','')] = var47
        self.final_data[self.sample_variable[47][1][1].replace('\x00','')] = var48
        self.final_data[self.sample_variable[48][1][1].replace('\x00','')] = var49  
        self.final_data[self.sample_variable[49][1][1].replace('\x00','')] = var50

In [436]:
data = InputData(strbyte)

In [437]:
data.header_decode()
data.body_decode()
data.footer_decode() 
data.cycling_write_out()

Control Byte = 0b1
ZipBitData = 00011001
Control Byte = 0b0
ZipBitData = 00000000
Control Byte = 0b11
ZipBitData = 0100000000000100
Control Byte = 0b0
ZipBitData = 00000000
Control Byte = 0b0
ZipBitData = 00000000
Control Byte = 0b1
ZipBitData = 00000010
Control Byte = 0b1
ZipBitData = 00000010
Control Byte = 0b1
ZipBitData = 00000100
Control Byte = 0b111
ZipBitData = 000001110100000000111000
Control Byte = 0b110
ZipBitData = 000001010100000000000000
Control Byte = 0b110
ZipBitData = 000001010100000000000000
Control Byte = 0b110
ZipBitData = 000001110100000000000000
Control Byte = 0b110
ZipBitData = 000001111100000000000000
Control Byte = 0b110
ZipBitData = 000001011100000000000000
Control Byte = 0b110
ZipBitData = 000001111100000000000000
Control Byte = 0b110
ZipBitData = 000001111100000000000000
Control Byte = 0b110
ZipBitData = 000001111100000000000000
Control Byte = 0b110
ZipBitData = 000001011100000000000000
Control Byte = 0b110
ZipBitData = 000011110100000000000000
Control Byte =

In [439]:
data.sample_variable

[[['size', 25],
  ['name', 'LT-V-DVR est active\x00'],
  ['type', 0],
  ['bitposition', 0],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 23],
  ['name', 'LT-V<2 est active\x00'],
  ['type', 0],
  ['bitposition', 1],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 23],
  ['name', 'LT-AO1 est active\x00'],
  ['type', 0],
  ['bitposition', 2],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 23],
  ['name', 'LT-AO2 est active\x00'],
  ['type', 0],
  ['bitposition', 3],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 23],
  ['name', 'LT-Ads est active\x00'],
  ['type', 0],
  ['bitposition', 4],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 23],
  ['name', 'LT-Acq est active\x00'],
  ['type', 0],
  ['bitposition', 5],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 22],
  ['name', 'LT-CF est active\x00'],
  ['type', 0],
  ['bitposition', 6],
  ['sampleposition', 0],
  ['signal_type', 'IO']],
 [['size', 2

In [433]:
#Export Final File as CSV
data.final_data.to_csv('door_data.csv', index=False)