# File Contents Converter

### バイナリファイル内の文字列を任意に置換するプログラム

#### 01. read確認

In [2]:
fr = open('test_01.DMP', 'rb')
read_data = fr.read()
print(read_data)
fr.close()

b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./0123456789:;<=>?'


#### 02. write確認

In [3]:
fw = open('write_01.DMP', 'wb')
fw.write(read_data)
fw.close()

#### 03. 置換の確認

In [5]:
for i in (b'\x00', b'\x10', b'\x20', b'\x30'):
    read_data = read_data.replace(i, b'\xAA')
fc = open('convert_01.DMP', 'wb')
fc.write(read_data)
fc.close()

#### 04. バイナリファイルを指定byteずつreadする

In [3]:
# ファイルサイズを調べなくてもEOF検出で行ける
with open('./test_01.DMP', 'rb') as fr:
    while True:
        read_data = fr.read(16)
        print(read_data)
        if read_data == b'':
            print('End of File')
            break
fr.close()

b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
b' !"#$%&\'()*+,-./'
b'0123456789:;<=>?'
b''
End of File


#### 05. 実行時に外部から与える変数を定義する

In [42]:
input_val = input('Enter header size(default 4):')
val = 4 if input_val == '' else int(input_val)

Enter header size(default 4): 6


In [25]:
target_unitID = int(input('Enter target_unitID:'))
target_unitID = target_unitID.to_bytes(1, 'little')
print(target_unitID)



target_timestamp = int(input('Enter timestamp:'))
target_timestamp = target_timestamp.to_bytes(4, 'little')
print(target_timestamp)

Enter target_unitID: 3


b'\x03'


Enter timestamp: 4567


b'\xd7\x11\x00\x00'


In [42]:
target_timestamp = int(input('Enter timestamp:'))
target_timestamp = target_timestamp.to_bytes(4, 'big')
print(target_timestamp)

Enter timestamp: 67438087


b'\x04\x05\x06\x07'


In [45]:
target_timestamp = int(input('Enter timestamp:')).to_bytes(4, 'big')
print(target_timestamp)

Enter timestamp: 67438087


b'\x04\x05\x06\x07'


#### 06. 【本体】指定byteずつ読み込んだ任意の文字列を置換して出力する

In [8]:
# データイメージ /1byte/
# aa/uu/xx/xx/ts/ts/ts/ts/1280byte data/

import struct

# データ入力
target_unitID = int(input('Enter target_unitID:')).to_bytes(1, 'little')
target_timestamp = int(input('Enter timestamp:')).to_bytes(4, 'big') # 67438087
target_from = int(input('Enter angleID From:')).to_bytes(1, 'little')
target_to = int(input('Enter angleID To:')).to_bytes(1, 'little')

# 初期化
frame_header_size = 4 # DEADBEEF
line_header_size = 8 # ラインデータのヘッダ部 12
line_data_size = 4 # ラインデータ本体 1280

input_file = './input.DMP'
output_file = './output.DMP'

count = 0
read_size = 0
write_data = b''

# メイン処理
with open(input_file, 'rb') as fr:
    frame_header = fr.read(frame_header_size) #先頭4byteはスキップ  
    write_data = frame_header
    while True:
        line_header_data = fr.read(line_header_size)
        line_main_data = fr.read(line_data_size)
        read_size += line_header_size + line_data_size

        if line_header_data[1:2] == target_unitID and line_header_data[4:8] == target_timestamp and \
        line_header_data[0:1] == target_from:
            count += 1
            line_header_replaced = line_header_data[0:1].replace(target_from, target_to)
        else:
            line_header_replaced = line_header_data[0:1]
        write_data += (line_header_replaced + line_header_data[1:8] + line_main_data)

#         if line_header_data == b'' or line_main_data == b'' or \
#         (line_header_data[1:2] == target_unitID and line_header_data[4:8] > target_timestamp):
        if line_header_data == b'' or line_main_data == b'':
            print('End of File')
            break
                  
# ファイル出力
print("置換数:", count)
fw = open(output_file, 'wb')
fw.write(write_data)
fw.close()
fr.close()

input("処理完了しました。キー入力待ち。 ")

Enter target_unitID: 3
Enter timestamp: 67438087
Enter angleID From: 239
Enter angleID To: 237


End of File
置換数: 2


処理完了しました。キー入力待ち。  


''

#### 07. ファイルの分割

In [58]:
import os
import shutil
import struct
    
def divide_file(filePath, frame_header_size, chunkSize, chunk_path):
    readedDataSize = frame_header_size
    i = 0
    fileList = []
    # 対象ファイルを開く
    f = open(filePath, "rb")
    # ファイルを読み終わるまで繰り返す
    contentLength = os.path.getsize(filePath)
    while readedDataSize < contentLength:
        # 読み取り位置をシーク
        f.seek(readedDataSize)
        # 指定されたデータサイズだけ読み込む
        data = f.read(chunkSize)
        # 分割ファイルを保存
        saveFilePath = chunk_path + "/" + filePath + "." + str(i)
        with open(saveFilePath, 'wb') as saveFile:
            saveFile.write(data)
        # 読み込んだデータサイズの更新
        readedDataSize = readedDataSize + len(data)
        i = i + 1
        fileList.append(saveFilePath)
    return fileList

def check_string(filePath, timestamp):
    with open(filePath, 'rb') as temp_f:
        if timestamp in temp_f.read():
            return True # The string is found
        return False  # The string does not exist in the file

def FCC(filePath, frame_header_size, line_header_size, line_data_size, \
                target_unitID, target_timestamp, target_from, target_to):
    read_size = 0
    write_data = b''
    # メイン処理
    with open(filePath, 'rb') as fr:
#         frame_header = fr.read(frame_header_size) #先頭4byteはスキップ  
#         write_data = frame_header
        while True:
            line_header_data = fr.read(line_header_size)
            line_main_data = fr.read(line_data_size)
            read_size += line_header_size + line_data_size

            if line_header_data[1:2] == target_unitID and line_header_data[4:8] == target_timestamp and \
            line_header_data[0:1] == target_from:
                line_header_replaced = line_header_data[0:1].replace(target_from, target_to)
            else:
                line_header_replaced = line_header_data[0:1]
            write_data += (line_header_replaced + line_header_data[1:8] + line_main_data)

            if line_header_data == b'' or line_main_data == b'':
#                 print('End of File')
                break
        # 置換結果でファイル上書き
        fw = open(filePath, 'wb')
        fw.write(write_data) 
        fw.close()
        fr.close()

def join_file(file_list, filePath):
    with open(filePath, 'wb') as saveFile:
        saveFile.write(b'\xEF\xBE\xAD\xDE')
        for f in file_list:
            data = open(f, 'rb').read()
            saveFile.write(data)
            saveFile.flush()

def main():

    # 初期化
    frame_header_size = 4 # DEADBEEF
    line_header_size = 8 # ラインデータのヘッダ部 12
    line_data_size = 4 # ラインデータ本体 1280
    chunk_size = 12 # ファイル分割サイズ
    
    input_file = 'input.DMP'
    output_file = './output.DMP' 
    chunk_path = './chunk_data'
    output_data = b''
    count = 0
    
    os.mkdir(chunk_path)
    
    #データ入力
    target_unitID = int(input('Enter target_unitID:')).to_bytes(1, 'little')
    target_timestamp = int(input('Enter timestamp:')).to_bytes(4, 'big') # 67438087
    target_from = int(input('Enter angleID From:')).to_bytes(1, 'little')
    target_to = int(input('Enter angleID To:')).to_bytes(1, 'little')

    # ファイル分割
    file_list = divide_file(input_file, frame_header_size, chunk_size, chunk_path)
    
    # ファイル捜査しhitしたら置換処理を行う
    for filePath in file_list:
        count += 1
        print(count, '/', (len(file_list))) if count%2 == 0 else 'odd'
        result = check_string(filePath, target_timestamp) # ファイル捜査
        if result: # hitしたものに対し置換処理
            FCC(filePath, frame_header_size, line_header_size, line_data_size, \
                target_unitID, target_timestamp, target_from, target_to)
    
    join_file(file_list, output_file)
    
    shutil.rmtree(chunk_path)
    
    input("処理完了しました。キー入力待ち。 ")
            
if __name__ == "__main__":
    main()


Enter target_unitID: 3
Enter timestamp: 67438087
Enter angleID From: 239
Enter angleID To: 237


2 / 5
4 / 5


処理完了しました。キー入力待ち。  


#### 08. 実DMPファイル用のチューニング

In [51]:
import os
import shutil
import struct
    
def divide_file(filePath, frame_header_size, chunkSize, chunk_path):
    readedDataSize = frame_header_size
    i = 0
    fileList = []
    # 対象ファイルを開く
    f = open(filePath, "rb")
    # ファイルを読み終わるまで繰り返す
    contentLength = os.path.getsize(filePath)
    while readedDataSize < contentLength:
        # 読み取り位置をシーク
        f.seek(readedDataSize)
        # 指定されたデータサイズだけ読み込む
        data = f.read(chunkSize)
        # 分割ファイルを保存
        saveFilePath = chunk_path + "/" + filePath + "." + str(i)
        with open(saveFilePath, 'wb') as saveFile:
            saveFile.write(data)
        # 読み込んだデータサイズの更新
        readedDataSize = readedDataSize + len(data)
        i = i + 1
        fileList.append(saveFilePath)
    return fileList

def check_string(filePath, timestamp):
    with open(filePath, 'rb') as temp_f:
        if timestamp in temp_f.read():
            return True # The string is found
        return False  # The string does not exist in the file

def FCC(filePath, frame_header_size, line_header_size, line_data_size, \
                target_unitID, target_timestamp, target_from, target_to):
    write_data = b''
    # メイン処理
    with open(filePath, 'rb') as fr:
#         frame_header = fr.read(frame_header_size) #先頭4byteはスキップ  
#         write_data = frame_header
        while True:
            line_header_data = fr.read(line_header_size)
            line_main_data = fr.read(line_data_size)

            if line_header_data[5:6] == target_unitID and line_header_data[8:12] == target_timestamp and \
            line_header_data[4:5] == target_from:
                line_header_replaced = line_header_data[4:5].replace(target_from, target_to)
            else:
                line_header_replaced = line_header_data[4:5]
            write_data += (line_header_data[0:4] + line_header_replaced + line_header_data[5:12] + line_main_data)

            if line_header_data == b'' or line_main_data == b'':
#                 print('End of File')
                break
        # 置換結果でファイル上書き
        fw = open(filePath, 'wb')
        fw.write(write_data) 
        fw.close()
        fr.close()

def join_file(file_list, filePath):
    with open(filePath, 'wb') as saveFile:
        saveFile.write(b'\xEF\xBE\xAD\xDE')
        for f in file_list:
            data = open(f, 'rb').read()
            saveFile.write(data)
            saveFile.flush()

def main():

    # 初期化
    frame_header_size = 4 # DEADBEEF
    line_header_size = 12 # ラインデータのヘッダ部 12
    line_data_size = 1280 # ラインデータ本体 1280
    chunk_size = (line_header_size + line_data_size) * 200 * 8 * 1 # ファイル分割サイズ 8ch x 1frm
    
    input_file = 'input.DMP'
    output_file = './output.DMP' 
    chunk_path = './chunk_data'
    output_data = b''
    count = 0
    
    os.mkdir(chunk_path)
    
    #データ入力
    target_unitID = int(input('Enter target_unitID:')).to_bytes(1, 'little')
    target_timestamp = int(input('Enter timestamp:')).to_bytes(4, 'big') # 67438087
    target_from = int(input('Enter angleID From:')).to_bytes(1, 'little')
    target_to = int(input('Enter angleID To:')).to_bytes(1, 'little')

    # ファイル分割
    file_list = divide_file(input_file, frame_header_size, chunk_size, chunk_path)
    
    # ファイル捜査しhitしたら置換処理を行う
    for filePath in file_list:
        count += 1
        print('Progress: ', count, '/', (len(file_list))) if count%5 == 0 else 'odd'
        result = check_string(filePath, target_timestamp) # ファイル捜査
        if result: # hitしたものに対し置換処理
            FCC(filePath, frame_header_size, line_header_size, line_data_size, \
                target_unitID, target_timestamp, target_from, target_to)
    
    # 分割したファイルを結合する
    join_file(file_list, output_file)
    
    shutil.rmtree(chunk_path)
    
    input("処理完了しました。キー入力待ち。 ")
            
if __name__ == "__main__":
    main()


NameError: name 'chunk_path' is not defined

In [50]:
fr = open('input.DMP', 'rb')
read_data = fr.read()
print(read_data[5:])
fr.close()

b'\x01'
