-
Notifications
You must be signed in to change notification settings - Fork 7.2k
/
fatfsparse.py
executable file
·167 lines (142 loc) · 7.74 KB
/
fatfsparse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import construct
from fatfs_utils.boot_sector import BootSector
from fatfs_utils.entry import Entry
from fatfs_utils.fat import FAT
from fatfs_utils.fatfs_state import BootSectorState
from fatfs_utils.utils import FULL_BYTE, LONG_NAMES_ENCODING, PAD_CHAR, FATDefaults, lfn_checksum, read_filesystem
from wl_fatfsgen import remove_wl
def build_file_name(name1: bytes, name2: bytes, name3: bytes) -> str:
full_name_ = name1 + name2 + name3
# need to strip empty bytes and null-terminating char ('\x00')
return full_name_.rstrip(FULL_BYTE).decode(LONG_NAMES_ENCODING).rstrip('\x00')
def get_obj_name(obj_: dict, directory_bytes_: bytes, entry_position_: int, lfn_checksum_: int) -> str:
obj_ext_ = obj_['DIR_Name_ext'].rstrip(chr(PAD_CHAR))
ext_ = f'.{obj_ext_}' if len(obj_ext_) > 0 else ''
obj_name_: str = obj_['DIR_Name'].rstrip(chr(PAD_CHAR)) + ext_ # short entry name
# if LFN was detected, the record is considered as single SFN record only if DIR_NTRes == 0x18 (LDIR_DIR_NTRES)
# if LFN was not detected, the record cannot be part of the LFN, no matter the value of DIR_NTRes
if not args.long_name_support or obj_['DIR_NTRes'] == Entry.LDIR_DIR_NTRES:
return obj_name_
full_name = {}
for pos in range(entry_position_ - 1, -1, -1): # loop from the current entry back to the start
obj_address_: int = FATDefaults.ENTRY_SIZE * pos
entry_bytes_: bytes = directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE]
struct_ = Entry.parse_entry_long(entry_bytes_, lfn_checksum_)
if len(struct_.items()) > 0:
full_name[struct_['order']] = build_file_name(struct_['name1'], struct_['name2'], struct_['name3'])
if struct_['is_last']:
break
return ''.join(map(lambda x: x[1], sorted(full_name.items()))) or obj_name_
def traverse_folder_tree(directory_bytes_: bytes,
name: str,
state_: BootSectorState,
fat_: FAT,
binary_array_: bytes) -> None:
os.makedirs(name)
assert len(directory_bytes_) % FATDefaults.ENTRY_SIZE == 0
entries_count_: int = len(directory_bytes_) // FATDefaults.ENTRY_SIZE
for i in range(entries_count_):
obj_address_: int = FATDefaults.ENTRY_SIZE * i
try:
obj_: dict = Entry.ENTRY_FORMAT_SHORT_NAME.parse(
directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE])
except (construct.core.ConstError, UnicodeDecodeError, construct.core.StringError):
args.long_name_support = True
continue
if obj_['DIR_Attr'] == 0: # empty entry
continue
obj_name_: str = get_obj_name(obj_,
directory_bytes_,
entry_position_=i,
lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext']))
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
content_ = b''
if obj_['DIR_FileSize'] > 0:
content_ = fat_.get_chained_content(cluster_id_=Entry.get_cluster_id(obj_),
size=obj_['DIR_FileSize'])
with open(os.path.join(name, obj_name_), 'wb') as new_file:
new_file.write(content_)
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
# avoid creating symlinks to itself and parent folder
if obj_name_ in ('.', '..'):
continue
child_directory_bytes_ = fat_.get_chained_content(cluster_id_=obj_['DIR_FstClusLO'])
traverse_folder_tree(directory_bytes_=child_directory_bytes_,
name=os.path.join(name, obj_name_),
state_=state_,
fat_=fat_,
binary_array_=binary_array_)
def remove_wear_levelling_if_exists(fs_: bytes) -> bytes:
"""
Detection of the wear levelling layer is performed in two steps:
1) check if the first sector is a valid boot sector
2) check if the size defined in the boot sector is the same as the partition size:
- if it is, there is no wear levelling layer
- otherwise, we need to remove wl for further processing
"""
try:
boot_sector__ = BootSector()
boot_sector__.parse_boot_sector(fs_)
if boot_sector__.boot_sector_state.size == len(fs_):
return fs_
except construct.core.ConstError:
pass
plain_fs: bytes = remove_wl(fs_)
return plain_fs
if __name__ == '__main__':
desc = 'Tool for parsing fatfs image and extracting directory structure on host.'
argument_parser: argparse.ArgumentParser = argparse.ArgumentParser(description=desc)
argument_parser.add_argument('input_image',
help='Path to the image that will be parsed and extracted.')
argument_parser.add_argument('--long-name-support',
action='store_true',
help=argparse.SUPPRESS)
# ensures backward compatibility
argument_parser.add_argument('--wear-leveling',
action='store_true',
help=argparse.SUPPRESS)
argument_parser.add_argument('--wl-layer',
choices=['detect', 'enabled', 'disabled'],
default=None,
help="If detection doesn't work correctly, "
'you can force analyzer to or not to assume WL.')
args = argument_parser.parse_args()
# if wear levelling is detected or user explicitly sets the parameter `--wl_layer enabled`
# the partition with wear levelling is transformed to partition without WL for convenient parsing
# in some cases the partitions with and without wear levelling can be 100% equivalent
# and only user can break this tie by explicitly setting
# the parameter --wl-layer to enabled, respectively disabled
if args.wear_leveling and args.wl_layer:
raise NotImplementedError('Argument --wear-leveling cannot be combined with --wl-layer!')
if args.wear_leveling:
args.wl_layer = 'enabled'
args.wl_layer = args.wl_layer or 'detect'
fs = read_filesystem(args.input_image)
# An algorithm for removing wear levelling:
# 1. find an remove dummy sector:
# a) dummy sector is at the position defined by the number of records in the state sector
# b) dummy may not be placed in state nor cfg sectors
# c) first (boot) sector position (boot_s_pos) is calculated using value of move count
# boot_s_pos = - mc
# 2. remove state sectors (trivial)
# 3. remove cfg sector (trivial)
# 4. valid fs is then old_fs[-mc:] + old_fs[:-mc]
if args.wl_layer == 'enabled':
fs = remove_wl(fs)
elif args.wl_layer != 'disabled':
# wear levelling is removed to enable parsing using common algorithm
fs = remove_wear_levelling_if_exists(fs)
boot_sector_ = BootSector()
boot_sector_.parse_boot_sector(fs)
fat = FAT(boot_sector_.boot_sector_state, init_=False)
boot_dir_start_ = boot_sector_.boot_sector_state.root_directory_start
boot_dir_sectors = boot_sector_.boot_sector_state.root_dir_sectors_cnt
full_ = fs[boot_dir_start_: boot_dir_start_ + boot_dir_sectors * boot_sector_.boot_sector_state.sector_size]
traverse_folder_tree(full_,
boot_sector_.boot_sector_state.volume_label.rstrip(chr(PAD_CHAR)),
boot_sector_.boot_sector_state, fat, fs)