Skip to content

Commit

Permalink
Merge branch 'bugfix/fix-corrpted-content-full-partition' into 'master'
Browse files Browse the repository at this point in the history
fatfsparse.py: Limit the file content size to correct number

Closes IDF-5947

See merge request espressif/esp-idf!20048
  • Loading branch information
sio13 committed Sep 12, 2022
2 parents d6d8324 + 37178df commit 890a84d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 21 deletions.
13 changes: 10 additions & 3 deletions components/fatfs/fatfs_utils/entry.py
Expand Up @@ -34,8 +34,9 @@ class Entry:
# one entry can hold 13 characters with size 2 bytes distributed in three regions of the 32 bytes entry
CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE

# the last 16 bytes record in the LFN entry has first byte masked with the following value
LAST_RECORD_LFN_ENTRY: int = 0x40
SHORT_ENTRY: int = -1

# this value is used for short-like entry but with accepted lower case
SHORT_ENTRY_LN: int = 0

Expand Down Expand Up @@ -103,7 +104,7 @@ def _build_entry_long(names: List[bytes], checksum: int, order: int, is_last: bo
00002040: 54 48 49 53 49 53 7E 31 54 58 54 20 00 00 00 00 THISIS~1TXT.....
00002050: 21 00 00 00 00 00 00 00 21 00 02 00 15 00 00 00 !.......!.......
"""
order |= (0x40 if is_last else 0x00)
order |= (Entry.LAST_RECORD_LFN_ENTRY if is_last else 0x00)
long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40)
names[0] + # first 5 characters (10 bytes) of the name part
Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME
Expand All @@ -124,7 +125,13 @@ def parse_entry_long(entry_bytes_: bytes, my_check: int) -> dict:
return {}
names1 = entry_bytes_[14:26]
names2 = entry_bytes_[28:32]
return {'order': order_, 'name1': names0, 'name2': names1, 'name3': names2, 'is_last': bool(order_ & 0x40 == 0x40)}
return {
'order': order_,
'name1': names0,
'name2': names1,
'name3': names2,
'is_last': bool(order_ & Entry.LAST_RECORD_LFN_ENTRY == Entry.LAST_RECORD_LFN_ENTRY)
}

@property
def entry_bytes(self) -> bytes:
Expand Down
29 changes: 16 additions & 13 deletions components/fatfs/fatfs_utils/fat.py
@@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0

from typing import List
from typing import List, Optional

from .cluster import Cluster
from .exceptions import NoFreeClusterException
Expand Down Expand Up @@ -39,22 +39,25 @@ def is_cluster_last(self, cluster_id_: int) -> bool:
is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1
return is_cluster_last_

def chain_content(self, cluster_id_: int) -> bytearray:
bin_im: bytearray = self.boot_sector_state.binary_image
if self.is_cluster_last(cluster_id_):
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
content_: bytearray = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
return content_
fat_value_: int = self.get_cluster_value(cluster_id_)
def get_chained_content(self, cluster_id_: int, size: Optional[int] = None) -> bytearray:
"""
The purpose of the method is retrieving the content from chain of clusters when the FAT FS partition
is analyzed. The file entry provides the reference to the first cluster, this method
traverses linked list of clusters and append partial results to the content.
"""
binary_image: bytearray = self.boot_sector_state.binary_image

data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
content_ = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
content_ = binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size]

while not self.is_cluster_last(cluster_id_):
cluster_id_ = fat_value_
fat_value_ = self.get_cluster_value(cluster_id_)
cluster_id_ = self.get_cluster_value(cluster_id_)
data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_)
content_ += bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size]
return content_
content_ += binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size]
# the size is None if the object is directory
if size is None:
return content_
return content_[:size]

def find_free_cluster(self) -> Cluster:
# finds first empty cluster and allocates it
Expand Down
13 changes: 10 additions & 3 deletions components/fatfs/fatfs_utils/fatfs_state.py
Expand Up @@ -5,8 +5,9 @@
from typing import Optional

from .exceptions import InconsistentFATAttributes
from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS, FATDefaults,
get_fat_sectors_count, get_fatfs_type, get_non_data_sectors_cnt, number_of_clusters)
from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS,
RESERVED_CLUSTERS_COUNT, FATDefaults, get_fat_sectors_count, get_fatfs_type,
get_non_data_sectors_cnt, number_of_clusters)


class FATFSState:
Expand Down Expand Up @@ -133,7 +134,13 @@ def fatfs_type(self) -> int:

@property
def clusters(self) -> int:
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster)
"""
The actual number of clusters is calculated by `number_of_clusters`,
however, the initial two blocks of FAT are reserved (device type and root directory),
despite they don't refer to the data region.
Since that, two clusters are added to use the full potential of the FAT file system partition.
"""
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster) + RESERVED_CLUSTERS_COUNT
return clusters_cnt_

@property
Expand Down
1 change: 1 addition & 0 deletions components/fatfs/fatfs_utils/utils.py
Expand Up @@ -12,6 +12,7 @@

FAT12_MAX_CLUSTERS: int = 4085
FAT16_MAX_CLUSTERS: int = 65525
RESERVED_CLUSTERS_COUNT: int = 2
PAD_CHAR: int = 0x20
FAT12: int = 12
FAT16: int = 16
Expand Down
5 changes: 3 additions & 2 deletions components/fatfs/fatfsparse.py
Expand Up @@ -67,14 +67,15 @@ def traverse_folder_tree(directory_bytes_: bytes,
entry_position_=i,
lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext']))
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
content_ = fat_.chain_content(cluster_id_=Entry.get_cluster_id(obj_)).rstrip(chr(0x00).encode())
content_ = fat_.get_chained_content(cluster_id_=Entry.get_cluster_id(obj_),
size=obj_['DIR_FileSize'])
with open(os.path.join(name, obj_name_), 'wb') as new_file:
new_file.write(content_)
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
# avoid creating symlinks to itself and parent folder
if obj_name_ in ('.', '..'):
continue
child_directory_bytes_ = fat_.chain_content(cluster_id_=obj_['DIR_FstClusLO'])
child_directory_bytes_ = fat_.get_chained_content(cluster_id_=obj_['DIR_FstClusLO'])
traverse_folder_tree(directory_bytes_=child_directory_bytes_,
name=os.path.join(name, obj_name_),
state_=state_,
Expand Down

0 comments on commit 890a84d

Please sign in to comment.