Skip to content

Commit

Permalink
Merge branch 'feature/add-test-coverage-with-review' into 'master'
Browse files Browse the repository at this point in the history
fatfs: add test coverage and comments to the fatfsgen.py related code

Closes IDF-5864

See merge request espressif/esp-idf!19986
  • Loading branch information
pacucha42 committed Sep 29, 2022
2 parents 47fa643 + 943f964 commit e82adaa
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 56 deletions.
26 changes: 18 additions & 8 deletions components/fatfs/fatfs_utils/boot_sector.py
Expand Up @@ -3,7 +3,7 @@
from inspect import getmembers, isroutine
from typing import Optional

from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct, core

from .exceptions import InconsistentFATAttributes, NotInitialized
from .fatfs_state import BootSectorState
Expand Down Expand Up @@ -56,7 +56,7 @@ class BootSector:
assert BOOT_SECTOR_HEADER.sizeof() == BOOT_HEADER_SIZE

def __init__(self, boot_sector_state: Optional[BootSectorState] = None) -> None:
self._parsed_header = None
self._parsed_header: dict = {}
self.boot_sector_state: BootSectorState = boot_sector_state

def generate_boot_sector(self) -> None:
Expand Down Expand Up @@ -97,8 +97,12 @@ def generate_boot_sector(self) -> None:
)

def parse_boot_sector(self, binary_data: bytes) -> None:
self._parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(binary_data)
if self._parsed_header is None:
"""
Checks the validity of the boot sector and derives the metadata from boot sector to the structured shape.
"""
try:
self._parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(binary_data)
except core.StreamError:
raise NotInitialized('The boot sector header is not parsed successfully!')

if self._parsed_header['BPB_TotSec16'] != 0x00:
Expand Down Expand Up @@ -141,9 +145,14 @@ def parse_boot_sector(self, binary_data: bytes) -> None:
assert self.boot_sector_state.file_sys_type in (f'FAT{self.boot_sector_state.fatfs_type} ', 'FAT ')

def __str__(self) -> str:
if self._parsed_header is None:
"""
FATFS properties parser (internal helper tool for fatfsgen.py/fatfsparse.py)
Provides all the properties of given FATFS instance by parsing its boot sector (returns formatted string)
"""

if self._parsed_header == {}:
return 'Boot sector is not initialized!'
res: str = 'Properties of the FATFS:\n'
res: str = 'FATFS properties:\n'
for member in getmembers(self.boot_sector_state, lambda a: not (isroutine(a))):
prop_ = getattr(self.boot_sector_state, member[0])
if isinstance(prop_, int) or isinstance(prop_, str) and not member[0].startswith('_'):
Expand All @@ -152,7 +161,8 @@ def __str__(self) -> str:

@property
def binary_image(self) -> bytes:
if len(self.boot_sector_state.binary_image) == 0:
raise NotInitialized('Boot sector is not generated nor initialized!')
# when BootSector is not instantiated, self.boot_sector_state might be None
if self.boot_sector_state is None or len(self.boot_sector_state.binary_image) == 0:
raise NotInitialized('Boot sector is not initialized!')
bin_image_: bytes = self.boot_sector_state.binary_image
return bin_image_
63 changes: 50 additions & 13 deletions components/fatfs/fatfs_utils/cluster.py
Expand Up @@ -30,6 +30,14 @@ def __init__(self,
cluster_id: int,
boot_sector_state: BootSectorState,
init_: bool) -> None:
"""
Initially, if init_ is False, the cluster is virtual and is not allocated (doesn't do changes in the FAT).
:param cluster_id: the cluster ID - a key value linking the file's cluster,
the corresponding physical cluster (data region) and the FAT table cluster.
:param boot_sector_state: auxiliary structure holding the file-system's metadata
:param init_: True for allocation the cluster on instantiation, otherwise False.
:returns: None
"""
self.id: int = cluster_id
self.boot_sector_state: BootSectorState = boot_sector_state

Expand All @@ -50,8 +58,19 @@ def next_cluster(self): # type: () -> Optional[Cluster]
def next_cluster(self, value): # type: (Optional[Cluster]) -> None
self._next_cluster = value

def _cluster_id_to_logical_position_in_bits(self, _id: int) -> int:
# computes address of the cluster in fat table
def _cluster_id_to_fat_position_in_bits(self, _id: int) -> int:
"""
This private method calculates the position of the memory block (cluster) in the FAT table.
:param _id: the cluster ID - a key value linking the file's cluster,
the corresponding physical cluster (data region) and the FAT table cluster.
:returns: bit offset of the cluster in FAT
e.g.:
00003000: 42 65 00 2E 00 74 00 78 00 74 00 0F 00 43 FF FF
For FAT12 the third cluster has value = 0x02E and ID = 2.
Its bit-address is 24 (24 bits preceding, 0-indexed), because 0x2E starts at the bit-offset 24.
"""
logical_position_: int = self.boot_sector_state.fatfs_type * _id
return logical_position_

Expand All @@ -73,18 +92,10 @@ def compute_cluster_data_address(boot_sector_state: BootSectorState, id_: int) -
def _compute_cluster_data_address(self) -> int:
return self.compute_cluster_data_address(self.boot_sector_state, self.id)

def _set_left_half_byte(self, address: int, value: int) -> None:
self.boot_sector_state.binary_image[address] &= 0x0f
self.boot_sector_state.binary_image[address] |= value << 4

def _set_right_half_byte(self, address: int, value: int) -> None:
self.boot_sector_state.binary_image[address] &= 0xf0
self.boot_sector_state.binary_image[address] |= value

@property
def fat_cluster_address(self) -> int:
"""Determines how many bits precede the first bit of the cluster in FAT"""
return self._cluster_id_to_logical_position_in_bits(self.id)
return self._cluster_id_to_fat_position_in_bits(self.id)

@property
def real_cluster_address(self) -> int:
Expand Down Expand Up @@ -141,6 +152,27 @@ def set_in_fat(self, value: int) -> None:
2. if the cluster index is odd, we set the first half of the computed byte and the full consequent byte.
Order of half bytes is 1, 3, 2.
"""

def _set_msb_half_byte(address: int, value_: int) -> None:
"""
Sets 4 most significant bits (msb half-byte) of 'boot_sector_state.binary_image' at given
'address' to 'value_' (size of variable 'value_' is half byte)
If a byte contents is 0b11110000, the msb half-byte would be 0b1111
"""
self.boot_sector_state.binary_image[address] &= 0x0f
self.boot_sector_state.binary_image[address] |= value_ << 4

def _set_lsb_half_byte(address: int, value_: int) -> None:
"""
Sets 4 least significant bits (lsb half-byte) of 'boot_sector_state.binary_image' at given
'address' to 'value_' (size of variable 'value_' is half byte)
If a byte contents is 0b11110000, the lsb half-byte would be 0b0000
"""
self.boot_sector_state.binary_image[address] &= 0xf0
self.boot_sector_state.binary_image[address] |= value_

# value must fit into number of bits of the fat (12, 16 or 32)
assert value <= (1 << self.boot_sector_state.fatfs_type) - 1
half_bytes = split_by_half_byte_12_bit_little_endian(value)
Expand All @@ -151,17 +183,22 @@ def set_in_fat(self, value: int) -> None:
if self.fat_cluster_address % 8 == 0:
# even block
bin_img_[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0])
self._set_right_half_byte(self.real_cluster_address + 1, half_bytes[2])
_set_lsb_half_byte(self.real_cluster_address + 1, half_bytes[2])
elif self.fat_cluster_address % 8 != 0:
# odd block
self._set_left_half_byte(self.real_cluster_address, half_bytes[0])
_set_msb_half_byte(self.real_cluster_address, half_bytes[0])
bin_img_[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1])
elif self.boot_sector_state.fatfs_type == FAT16:
bin_img_[self.real_cluster_address:self.real_cluster_address + 2] = Int16ul.build(value)
assert self.get_from_fat() == value

@property
def is_root(self) -> bool:
"""
The FAT12/FAT16 contains only one root directory,
the root directory allocates the first cluster with the ID `ROOT_BLOCK_ID`.
The method checks if the cluster belongs to the root directory.
"""
return self.id == Cluster.ROOT_BLOCK_ID

def allocate_cluster(self) -> None:
Expand Down
3 changes: 3 additions & 0 deletions components/fatfs/fatfs_utils/exceptions.py
Expand Up @@ -48,4 +48,7 @@ class FatalError(Exception):


class InconsistentFATAttributes(Exception):
"""
Caused by e.g. wrong number of clusters for given FAT type
"""
pass
40 changes: 31 additions & 9 deletions components/fatfs/fatfs_utils/fat.py
Expand Up @@ -22,7 +22,7 @@ def allocate_root_dir(self) -> None:
self.clusters[Cluster.ROOT_BLOCK_ID].allocate_cluster()

def __init__(self, boot_sector_state: BootSectorState, init_: bool) -> None:
self._first_free_cluster_id = 0
self._first_free_cluster_id = 1
self.boot_sector_state = boot_sector_state
self.clusters: List[Cluster] = [Cluster(cluster_id=i,
boot_sector_state=self.boot_sector_state,
Expand All @@ -31,10 +31,22 @@ def __init__(self, boot_sector_state: BootSectorState, init_: bool) -> None:
self.allocate_root_dir()

def get_cluster_value(self, cluster_id_: int) -> int:
"""
The method retrieves the values of the FAT memory block.
E.g. in case of FAT12:
00000000: F8 FF FF 55 05 00 00 00 00 00 00 00 00 00 00 00
The reserved value is 0xFF8, the value of first cluster if 0xFFF, thus is last in chain,
and the value of the second cluster is 0x555, so refers to the cluster number 0x555.
"""
fat_cluster_value_: int = self.clusters[cluster_id_].get_from_fat()
return fat_cluster_value_

def is_cluster_last(self, cluster_id_: int) -> bool:
"""
Checks if the cluster is last in its cluster chain. If the value of the cluster is
0xFFF for FAT12, 0xFFFF for FAT16 or 0xFFFFFFFF for FAT32, the cluster is the last.
"""
value_ = self.get_cluster_value(cluster_id_)
is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1
return is_cluster_last_
Expand All @@ -60,16 +72,26 @@ def get_chained_content(self, cluster_id_: int, size: Optional[int] = None) -> b
return content_[:size]

def find_free_cluster(self) -> Cluster:
# finds first empty cluster and allocates it
for cluster_id, cluster in enumerate(self.clusters[self._first_free_cluster_id:],
start=self._first_free_cluster_id):
if cluster.is_empty:
cluster.allocate_cluster()
self._first_free_cluster_id = cluster_id
return cluster
raise NoFreeClusterException('No free cluster available!')
"""
Returns the first free cluster and increments value of `self._first_free_cluster_id`.
The method works only in context of creating a partition from scratch.
In situations where the clusters are allocated and freed during the run of the program,
might the method cause `Out of space` error despite there would be free clusters.
"""

if self._first_free_cluster_id + 1 >= len(self.clusters):
raise NoFreeClusterException('No free cluster available!')
cluster = self.clusters[self._first_free_cluster_id + 1]
if not cluster.is_empty:
raise NoFreeClusterException('No free cluster available!')
cluster.allocate_cluster()
self._first_free_cluster_id += 1
return cluster

def allocate_chain(self, first_cluster: Cluster, size: int) -> None:
"""
Allocates the linked list of clusters needed for the given file or directory.
"""
current = first_cluster
for _ in range(size - 1):
free_cluster = self.find_free_cluster()
Expand Down
25 changes: 23 additions & 2 deletions components/fatfs/fatfsgen.py
Expand Up @@ -79,7 +79,17 @@ def create_file(self, name: str,
extension: str = '',
path_from_root: Optional[List[str]] = None,
object_timestamp_: datetime = FATFS_INCEPTION) -> None:
# when path_from_root is None the dir is root
"""
Root directory recursively finds the parent directory of the new file, allocates cluster,
entry and appends a new file into the parent directory.
When path_from_root is None the dir is root.
:param name: The name of the file.
:param extension: The extension of the file.
:param path_from_root: List of strings containing names of the ancestor directories in the given order.
:param object_timestamp_: is not None, this will be propagated to the file's entry
"""
self.root_directory.new_file(name=name,
extension=extension,
path_from_root=path_from_root,
Expand All @@ -88,7 +98,18 @@ def create_file(self, name: str,
def create_directory(self, name: str,
path_from_root: Optional[List[str]] = None,
object_timestamp_: datetime = FATFS_INCEPTION) -> None:
# when path_from_root is None the dir is root
"""
Initially recursively finds a parent of the new directory
and then create a new directory inside the parent.
When path_from_root is None the parent dir is root.
:param name: The full name of the directory (excluding its path)
:param path_from_root: List of strings containing names of the ancestor directories in the given order.
:param object_timestamp_: in case the user preserves the timestamps, this will be propagated to the
metadata of the directory (to the corresponding entry)
:returns: None
"""
parent_dir = self.root_directory
if path_from_root:
parent_dir = self.root_directory.recursive_search(path_from_root, self.root_directory)
Expand Down
57 changes: 57 additions & 0 deletions components/fatfs/test_fatfsgen/test_fatfsgen.py
Expand Up @@ -12,10 +12,15 @@

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import fatfsgen # noqa E402 # pylint: disable=C0413
from fatfs_utils.boot_sector import BootSector # noqa E402 # pylint: disable=C0413
from fatfs_utils.cluster import Cluster # noqa E402 # pylint: disable=C0413
from fatfs_utils.entry import Entry # noqa E402 # pylint: disable=C0413
from fatfs_utils.exceptions import InconsistentFATAttributes # noqa E402 # pylint: disable=C0413
from fatfs_utils.exceptions import NotInitialized # noqa E402 # pylint: disable=C0413
from fatfs_utils.exceptions import TooLongNameException # noqa E402 # pylint: disable=C0413
from fatfs_utils.exceptions import WriteDirectoryException # noqa E402 # pylint: disable=C0413
from fatfs_utils.exceptions import LowerCaseException, NoFreeClusterException # noqa E402 # pylint: disable=C0413
from fatfs_utils.utils import right_strip_string # noqa E402 # pylint: disable=C0413
from fatfs_utils.utils import FAT12, read_filesystem # noqa E402 # pylint: disable=C0413


Expand Down Expand Up @@ -473,6 +478,58 @@ def test_lfn_increasing(self) -> None:
self.assertEqual(file_system[0x60d0: 0x60e0], b'e\x00l\x00l\x00o\x00h\x00\x00\x00e\x00l\x00')
self.assertEqual(file_system[0x60e0: 0x60f0], b'HELLOH~\x02TXT \x00\x00\x00\x00')

def test_bs_not_initialized(self) -> None:
self.assertEqual(str(BootSector()), 'Boot sector is not initialized!')
self.assertRaises(NotInitialized, BootSector().generate_boot_sector)
self.assertRaises(NotInitialized, lambda: BootSector().binary_image) # encapsulate property to callable

def test_bs_str(self) -> None:
fatfs = fatfsgen.FATFS()
bs = BootSector(fatfs.state.boot_sector_state)
bs.generate_boot_sector()
bs.parse_boot_sector(bs.binary_image)
x = 'FATFS properties:,clusters: 252,data_region_start: 24576,data_sectors: ' \
'250,entries_root_count: 512,fat_table_start_address: 4096,fat_tables_cnt: 1,' \
'fatfs_type: 12,file_sys_type: FAT ,hidden_sectors: 0,media_type: 248,' \
'non_data_sectors: 6,num_heads: 255,oem_name: MSDOS5.0,reserved_sectors_cnt: 1,' \
'root_dir_sectors_cnt: 4,root_directory_start: 8192,sec_per_track: 63,sector_size: 4096,' \
'sectors_count: 256,sectors_per_cluster: 1,sectors_per_fat_cnt: 1,size: 1048576,' \
'volume_label: Espressif ,volume_uuid: 1144419653,'
self.assertEqual(x.split(',')[:-2], str(bs).split('\n')[:-2]) # except for volume id

def test_parsing_error(self) -> None:
self.assertRaises(NotInitialized, BootSector().parse_boot_sector, b'')

def test_not_implemented_fat32(self) -> None:
self.assertEqual(
Entry.get_cluster_id(
Entry.ENTRY_FORMAT_SHORT_NAME.parse(
bytearray(b'AHOJ \x18\x00\xb0[&U&U\x00\x00\xb0[&U\x02\x00\x08\x00\x00\x00'))),
2)

def test_get_cluster_value_from_fat(self) -> None:
fatfs = fatfsgen.FATFS()
self.assertEqual(fatfs.fat.get_cluster_value(1), 0xFFF)

def test_is_cluster_last(self) -> None:
fatfs = fatfsgen.FATFS()
self.assertEqual(fatfs.fat.is_cluster_last(2), False)

def test_chain_in_fat(self) -> None:
fatfs = fatfsgen.FATFS()
self.assertEqual(fatfs.fat.get_chained_content(1), b'\x00' * 0x1000)

def test_retrieve_file_chaining(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=CFG['sector_size'] * b'a' + b'a')
fatfs.write_filesystem(CFG['output_file'])
self.assertEqual(fatfs.fat.get_chained_content(1)[:15], b'WRITEF TXT \x00\x00\x00')
self.assertEqual(fatfs.fat.get_chained_content(2)[:15], b'aaaaaaaaaaaaaaa')

def test_lstrip(self) -> None:
self.assertEqual(right_strip_string('\x20\x20\x20thisistest\x20\x20\x20'), ' thisistest')


if __name__ == '__main__':
unittest.main()
26 changes: 26 additions & 0 deletions components/fatfs/test_fatfsgen/test_fatfsparse.py
Expand Up @@ -12,6 +12,7 @@

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import fatfsgen # noqa E402 # pylint: disable=C0413
from fatfs_utils.entry import Entry # noqa E402 # pylint: disable=C0413


class FatFSGen(unittest.TestCase):
Expand Down Expand Up @@ -323,6 +324,31 @@ def test_e2e_very_deep_long(self) -> None:
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
assert compare_folders('testf', 'Espressif')

def test_parse_long_name(self) -> None:
self.assertEqual(
Entry.parse_entry_long(
b'\x01t\x00h\x00i\x00s\x00_\x00\x0f\x00\xfbi\x00s\x00_\x00l\x00o\x00n\x00\x00\x00g\x00_\x00', 251),
{
'order': 1,
'name1': b't\x00h\x00i\x00s\x00_\x00',
'name2': b'i\x00s\x00_\x00l\x00o\x00n\x00',
'name3': b'g\x00_\x00',
'is_last': False
}
)
self.assertEqual(
Entry.parse_entry_long(
b'\x01t\x00h\x00i\x00s\x00_\x00\x0f\x00\xfbi\x00s\x00_\x00l\x00o\x00n\x00\x00\x00g\x00_\x00', 252
),
{}
)
self.assertEqual(
Entry.parse_entry_long(
b'\x01t\x00h\x00i\x00s\x00_\x00\x0f\x01\xfbi\x00s\x00_\x00l\x00o\x00n\x00\x00\x00g\x00_\x00', 251
),
{}
)


if __name__ == '__main__':
unittest.main()

0 comments on commit e82adaa

Please sign in to comment.