diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index d3e9e8b..350d0fd 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -12,13 +12,22 @@ jobs: lint: name: Lint code runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python: + - "3.10" + - "3.11" + - "3.12" + - "3.13" + - "3.14" steps: - name: Checkout the Git repository uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 with: - python-version: 3.14 + python-version: ${{ matrix.python }} cache: "pip" - name: Lint code shell: bash @@ -82,6 +91,30 @@ jobs: - name: Run test shell: bash run: make test + fuzz: + name: Fuzz + runs-on: ubuntu-latest + needs: [test-image] + strategy: + fail-fast: false + matrix: + python: + #- "3.10" # atheris appears to break + - "3.11" + - "3.12" + - "3.13" + #- "3.14" # not supported by atheris yet + steps: + - name: Checkout the Git repository + uses: actions/checkout@v4 + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + cache: "pip" + - name: Run test + shell: bash + run: make fuzz build: name: Build pip package runs-on: ubuntu-latest @@ -106,7 +139,7 @@ jobs: publish: name: Publish to PyPi if: github.repository == 'Eeems/python-ext4' && github.event_name == 'release' && startsWith(github.ref, 'refs/tags') - needs: [build] + needs: [build, fuzz] runs-on: ubuntu-latest permissions: id-token: write @@ -128,7 +161,7 @@ jobs: release: name: Add pip to release if: github.repository == 'Eeems/python-ext4' && github.event_name == 'release' && startsWith(github.ref, 'refs/tags') - needs: [build] + needs: [build, fuzz] runs-on: ubuntu-latest permissions: contents: write diff --git a/.gitignore b/.gitignore index ad32490..03b01bb 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,6 @@ cython_debug/ *.ext4 *.ext4.tmp +crash-* +timeout-* +corpus/seed/ diff --git a/Makefile b/Makefile index 9f1295a..116f8d2 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,6 @@ VERSION := $(shell grep -m 1 version pyproject.toml | tr -s ' ' | tr -d "'\":" | PACKAGE := $(shell grep -m 1 name pyproject.toml | tr -s ' ' | tr -d "'\":" | cut -d' ' -f3) OBJ := $(wildcard ${PACKAGE}/**) -OBJ += requirements.txt OBJ += pyproject.toml OBJ += README.md OBJ += LICENSE @@ -19,19 +18,24 @@ else endif endif -ifeq ($(PYTHON),) -PYTHON := python +ifeq ($(FUZZ_TIMEOUT),) +FUZZ_TIMEOUT := 60 endif +.PHONY: clean clean: git clean --force -dX +.PHONY: build build: wheel +.PHONY: release release: wheel sdist +.PHONY: sdist sdist: dist/${PACKAGE}-${VERSION}.tar.gz +.PHONY: wheel wheel: dist/${PACKAGE}-${VERSION}-py3-none-any.whl dist: @@ -39,55 +43,81 @@ dist: dist/${PACKAGE}-${VERSION}.tar.gz: ${VENV_BIN_ACTIVATE} dist $(OBJ) . ${VENV_BIN_ACTIVATE}; \ - $(PYTHON) -m build --sdist + python -m build --sdist dist/${PACKAGE}-${VERSION}-py3-none-any.whl: ${VENV_BIN_ACTIVATE} dist $(OBJ) . ${VENV_BIN_ACTIVATE}; \ - $(PYTHON) -m build --wheel + python -m build --wheel -${VENV_BIN_ACTIVATE}: requirements.txt +${VENV_BIN_ACTIVATE}: pyproject.toml @echo "Setting up development virtual env in .venv" - $(PYTHON) -m venv .venv + python -m venv .venv . ${VENV_BIN_ACTIVATE}; \ - $(PYTHON) -m pip install \ - wheel \ - build \ - ruff \ - basedpyright; \ - $(PYTHON) -m pip install \ - -r requirements.txt + python -m pip install \ + --require-virtualenv \ + --editable \ + .[dev]; +.PHONY: test test: ${VENV_BIN_ACTIVATE} + @. ${VENV_BIN_ACTIVATE}; \ + python -m pip install \ + --require-virtualenv \ + --editable \ + .[test]; $(SHELL) test.sh +.PHONY: fuzz +fuzz: ${VENV_BIN_ACTIVATE} + @. ${VENV_BIN_ACTIVATE}; \ + python -m pip install \ + --require-virtualenv \ + --editable \ + .[fuzz] + . ${VENV_BIN_ACTIVATE};\ + python fuzz.py \ + -rss_limit_mb=2048 \ + -max_total_time=$(FUZZ_TIMEOUT) + +.PHONY: all all: release -lint: $(VENV_BIN_ACTIVATE) +.PHONY: lint +lint: $(VENV_BIN_ACTIVATE); + @. ${VENV_BIN_ACTIVATE}; \ + python -m pip install \ + --require-virtualenv \ + --editable \ + .[test]; \ + python -m pip install \ + --require-virtualenv \ + --editable \ + .[fuzz] . $(VENV_BIN_ACTIVATE); \ - $(PYTHON) -m ruff check; \ - $(PYTHON) -m basedpyright - -lint-fix: $(VENV_BIN_ACTIVATE) + python -m ruff check; \ + python -m basedpyright + +.PHONY: lint-fix +lint-fix: $(VENV_BIN_ACTIVATE); \ + @. ${VENV_BIN_ACTIVATE}; \ + python -m pip install \ + --require-virtualenv \ + --editable \ + .[test]; \ + python -m pip install \ + --require-virtualenv \ + --editable \ + .[fuzz] . $(VENV_BIN_ACTIVATE); \ - $(PYTHON) -m ruff check --fix; \ - $(PYTHON) -m basedpyright + python -m ruff check --fix; \ + python -m basedpyright +.PHONY: format format: $(VENV_BIN_ACTIVATE) . $(VENV_BIN_ACTIVATE); \ - $(PYTHON) -m ruff format --diff + python -m ruff format --diff +.PHONY: format-fix format-fix: $(VENV_BIN_ACTIVATE) . $(VENV_BIN_ACTIVATE); \ - $(PYTHON) -m ruff format - -.PHONY: \ - all \ - build \ - clean \ - sdist \ - wheel \ - test \ - lint \ - lint-fix \ - format \ - format-fix + python -m ruff format diff --git a/corpus/1aecead5d62816aad8f6b936e3c3364e0155fcd3 b/corpus/1aecead5d62816aad8f6b936e3c3364e0155fcd3 new file mode 100644 index 0000000..01525cf Binary files /dev/null and b/corpus/1aecead5d62816aad8f6b936e3c3364e0155fcd3 differ diff --git a/corpus/efb6064ecb642140b2a63c1fa90f5a32591e3857 b/corpus/efb6064ecb642140b2a63c1fa90f5a32591e3857 new file mode 100644 index 0000000..6a3fd83 Binary files /dev/null and b/corpus/efb6064ecb642140b2a63c1fa90f5a32591e3857 differ diff --git a/ext4/__init__.py b/ext4/__init__.py index ecc30df..36e3d13 100644 --- a/ext4/__init__.py +++ b/ext4/__init__.py @@ -1,84 +1,110 @@ -from .enum import DX_HASH -from .enum import EXT2_FLAGS -from .enum import EXT4_BG -from .enum import EXT4_CHKSUM -from .enum import EXT4_DEFM -from .enum import EXT4_ERRORS -from .enum import EXT4_FEATURE_COMPAT -from .enum import EXT4_FEATURE_INCOMPAT -from .enum import EXT4_FEATURE_RO_COMPAT -from .enum import EXT4_FL -from .enum import EXT4_FS -from .enum import EXT4_FT -from .enum import EXT4_INO -from .enum import EXT4_MOUNT -from .enum import EXT4_MOUNT2 -from .enum import EXT4_OS -from .enum import EXT4_REV -from .enum import FS_ENCRYPTION_MODE -from .enum import MODE - -from .superblock import Superblock - +from .block import ( + BlockIO, + BlockIOBlocks, +) from .blockdescriptor import BlockDescriptor - -from .inode import BlockDevice -from .inode import CharacterDevice -from .inode import Directory -from .inode import Fifo -from .inode import File -from .inode import Hurd1 -from .inode import Hurd2 -from .inode import Inode -from .inode import Linux1 -from .inode import Linux2 -from .inode import Masix1 -from .inode import Masix2 -from .inode import Osd1 -from .inode import Osd2 -from .inode import Socket -from .inode import SymbolicLink - -from .volume import Volume -from .volume import InvalidStreamException - -from .extent import Extent -from .extent import ExtentBlocks -from .extent import ExtentHeader -from .extent import ExtentIndex -from .extent import ExtentTail - -from .struct import MagicError -from .struct import ChecksumError - -from .block import BlockIO -from .block import BlockIOBlocks - -from .directory import DirectoryEntry -from .directory import DirectoryEntry2 -from .directory import DirectoryEntryTail -from .directory import DirectoryEntryHash -from .directory import EXT4_NAME_LEN -from .directory import EXT4_DIR_PAD -from .directory import EXT4_DIR_ROUND -from .directory import EXT4_MAX_REC_LEN - -from .xattr import ExtendedAttributeError -from .xattr import ExtendedAttributeIBodyHeader -from .xattr import ExtendedAttributeHeader -from .xattr import ExtendedAttributeEntry - -from .htree import DXRoot -from .htree import DotDirectoryEntry2 -from .htree import DXEntry -from .htree import DXRootInfo +from .directory import ( + EXT4_DIR_PAD, + EXT4_DIR_ROUND, + EXT4_MAX_REC_LEN, + EXT4_NAME_LEN, + DirectoryEntry, + DirectoryEntry2, + DirectoryEntryHash, + DirectoryEntryTail, +) +from .enum import ( + DX_HASH, + EXT2_FLAGS, + EXT4_BG, + EXT4_CHKSUM, + EXT4_DEFM, + EXT4_ERRORS, + EXT4_FEATURE_COMPAT, + EXT4_FEATURE_INCOMPAT, + EXT4_FEATURE_RO_COMPAT, + EXT4_FL, + EXT4_FS, + EXT4_FT, + EXT4_INO, + EXT4_MOUNT, + EXT4_MOUNT2, + EXT4_OS, + EXT4_REV, + FS_ENCRYPTION_MODE, + MODE, +) +from .extent import ( + Extent, + ExtentBlocks, + ExtentHeader, + ExtentIndex, + ExtentTail, +) +from .htree import ( + DotDirectoryEntry2, + DXEntry, + DXRoot, + DXRootInfo, +) +from .inode import ( + BlockDevice, + CharacterDevice, + Directory, + Fifo, + File, + Hurd1, + Hurd2, + Inode, + InodeError, + Linux1, + Linux2, + Masix1, + Masix2, + Osd1, + Osd2, + Socket, + SymbolicLink, +) +from .struct import ( + ChecksumError, + MagicError, +) +from .superblock import Superblock +from .volume import ( + InvalidStreamException, + Volume, +) +from .xattr import ( + ExtendedAttributeEntry, + ExtendedAttributeError, + ExtendedAttributeHeader, + ExtendedAttributeIBodyHeader, +) __all__ = [ + "BlockDescriptor", + "BlockDevice", + "BlockIO", + "BlockIOBlocks", + "CharacterDevice", + "ChecksumError", + "Directory", + "DirectoryEntry", + "DirectoryEntry2", + "DirectoryEntryHash", + "DirectoryEntryTail", + "DotDirectoryEntry2", "DX_HASH", + "DXEntry", + "DXRoot", + "DXRootInfo", "EXT2_FLAGS", "EXT4_BG", "EXT4_CHKSUM", "EXT4_DEFM", + "EXT4_DIR_PAD", + "EXT4_DIR_ROUND", "EXT4_ERRORS", "EXT4_FEATURE_COMPAT", "EXT4_FEATURE_INCOMPAT", @@ -87,55 +113,39 @@ "EXT4_FS", "EXT4_FT", "EXT4_INO", + "EXT4_MAX_REC_LEN", "EXT4_MOUNT", "EXT4_MOUNT2", + "EXT4_NAME_LEN", "EXT4_OS", "EXT4_REV", - "FS_ENCRYPTION_MODE", - "MODE", - "Superblock", - "BlockDescriptor", - "BlockDevice", - "CharacterDevice", - "Directory", + "ExtendedAttributeEntry", + "ExtendedAttributeError", + "ExtendedAttributeHeader", + "ExtendedAttributeIBodyHeader", + "Extent", + "ExtentBlocks", + "ExtentHeader", + "ExtentIndex", + "ExtentTail", "Fifo", "File", + "FS_ENCRYPTION_MODE", "Hurd1", "Hurd2", "Inode", + "InodeError", + "InvalidStreamException", "Linux1", "Linux2", + "MagicError", "Masix1", "Masix2", + "MODE", "Osd1", "Osd2", "Socket", + "Superblock", "SymbolicLink", "Volume", - "InvalidStreamException", - "Extent", - "ExtentBlocks", - "ExtentHeader", - "ExtentIndex", - "ExtentTail", - "MagicError", - "ChecksumError", - "BlockIO", - "BlockIOBlocks", - "DirectoryEntry", - "DirectoryEntry2", - "DirectoryEntryTail", - "DirectoryEntryHash", - "EXT4_NAME_LEN", - "EXT4_DIR_PAD", - "EXT4_DIR_ROUND", - "EXT4_MAX_REC_LEN", - "ExtendedAttributeError", - "ExtendedAttributeIBodyHeader", - "ExtendedAttributeHeader", - "ExtendedAttributeEntry", - "DXRoot", - "DotDirectoryEntry2", - "DXEntry", - "DXRootInfo", ] diff --git a/ext4/_compat.py b/ext4/_compat.py index 3d3c328..8b9d711 100644 --- a/ext4/_compat.py +++ b/ext4/_compat.py @@ -1,18 +1,22 @@ +# pyright: reportUnnecessaryTypeIgnoreComment=false +# pyright: reportIgnoreCommentWithoutRule=false +# pyright: reportUnreachable=false +# pyright: reportExplicitAny=false +# pyright: reportAny=false import os -from typing import Protocol -from typing import runtime_checkable -from typing import TypeVar -from typing import Any +import sys +from typing import ( + Any, + Protocol, + TypeVar, + runtime_checkable, +) -# Added in python 3.12 -try: - from typing import override # pyright: ignore[reportAssignmentType] +if sys.version_info < (3, 12): + from typing_extensions import override -except ImportError: - from typing import Callable - - def override(fn: Callable[..., Any]): # pyright: ignore[reportExplicitAny] - return fn +else: + from typing import override @runtime_checkable @@ -32,8 +36,8 @@ def peek(self, size: int = 0, /) -> bytes: ... T = TypeVar("T") -def assert_cast(obj: Any, t: type[T], /) -> T: # pyright: ignore[reportExplicitAny, reportAny] - assert isinstance(obj, t), f"Object is: {type(obj)} not {t}" # pyright: ignore[reportAny] +def assert_cast(obj: Any, t: type[T], /) -> T: + assert isinstance(obj, t), f"Object is: {type(obj)} not {t}" return obj diff --git a/ext4/block.py b/ext4/block.py index 9310937..07948d4 100644 --- a/ext4/block.py +++ b/ext4/block.py @@ -1,36 +1,38 @@ # pyright: reportImportCycles=false -import io import errno +import io +import os +from typing import TYPE_CHECKING from ._compat import override -from typing import TYPE_CHECKING - if TYPE_CHECKING: + from .extent import Extent from .inode import Inode + from .volume import Volume -class BlockIOBlocks(object): - def __init__(self, blockio: "BlockIO"): +class BlockIOBlocks: + def __init__(self, blockio: "BlockIO") -> None: self.blockio: BlockIO = blockio self._null_block: bytearray = bytearray(self.block_size) @property - def block_size(self): + def block_size(self) -> int: return self.blockio.block_size @property - def volume(self): + def volume(self) -> "Volume": return self.blockio.inode.volume - def __contains__(self, ee_block: int): + def __contains__(self, ee_block: int) -> bool: for extent in self.blockio.extents: if ee_block in extent.blocks: return True return False - def __getitem__(self, ee_block: int): + def __getitem__(self, ee_block: int) -> bytearray | bytes: for extent in self.blockio.extents: if ee_block not in extent.blocks: continue @@ -41,17 +43,17 @@ def __getitem__(self, ee_block: int): class BlockIO(io.RawIOBase): - def __init__(self, inode: "Inode"): + def __init__(self, inode: "Inode") -> None: super().__init__() - self.inode: "Inode" = inode + self.inode: Inode = inode self.cursor: int = 0 self.blocks: BlockIOBlocks = BlockIOBlocks(self) - def __len__(self): + def __len__(self) -> int: return self.inode.i_size @property - def extents(self): + def extents(self) -> "list[Extent]": return self.inode.extents @property @@ -78,7 +80,7 @@ def seek(self, offset: int, mode: int = io.SEEK_SET) -> int: raise NotImplementedError() if offset < 0: - raise OSError(errno.EINVAL, "Invalid argument") + raise OSError(errno.EINVAL, os.strerror(errno.EINVAL)) self.cursor = offset return offset diff --git a/ext4/blockdescriptor.py b/ext4/blockdescriptor.py index e36eebc..cf87af3 100644 --- a/ext4/blockdescriptor.py +++ b/ext4/blockdescriptor.py @@ -1,14 +1,20 @@ # pyright: reportImportCycles=false -from ctypes import c_uint32 -from ctypes import c_uint16 +from ctypes import ( + c_uint16, + c_uint32, +) +from typing import ( + TYPE_CHECKING, + final, +) -from typing import final -from typing import TYPE_CHECKING - -from .enum import EXT4_BG -from .struct import Ext4Struct -from .struct import crc32c from ._compat import assert_cast +from .enum import EXT4_BG +from .struct import ( + Ext4Struct, + crc32c, +) +from .superblock import Superblock if TYPE_CHECKING: from .volume import Volume @@ -44,7 +50,7 @@ class BlockDescriptor(Ext4Struct): ("bg_reserved", c_uint32), ] - def __init__(self, volume: "Volume", offset: int, bg_no: int): + def __init__(self, volume: "Volume", offset: int, bg_no: int) -> None: super().__init__(volume, offset) self.bg_no: int = bg_no @@ -139,11 +145,11 @@ def bg_inode_table(self) -> int: return bg_inode_table_lo @property - def superblock(self): + def superblock(self) -> Superblock: return self.volume.superblock @Ext4Struct.checksum.getter - def checksum(self): + def checksum(self) -> int: csum = crc32c(self.bg_no.to_bytes(4, "little"), self.volume.seed) csum = crc32c(bytes(self)[: BlockDescriptor.bg_checksum.offset], csum) if self.volume.has_hi: diff --git a/ext4/directory.py b/ext4/directory.py index 9332dc3..42b4782 100644 --- a/ext4/directory.py +++ b/ext4/directory.py @@ -1,18 +1,23 @@ # pyright: reportImportCycles=false -from ctypes import c_uint32 -from ctypes import c_uint16 -from ctypes import c_uint8 -from ctypes import c_char -from ctypes import memmove -from ctypes import addressof - -from typing import final -from typing import TYPE_CHECKING - -from .struct import Ext4Struct +from ctypes import ( + addressof, + c_char, + c_uint8, + c_uint16, + c_uint32, + memmove, +) +from typing import ( + TYPE_CHECKING, + final, +) + +from ._compat import ( + assert_cast, + override, +) from .enum import EXT4_FT -from ._compat import override -from ._compat import assert_cast +from .struct import Ext4Struct if TYPE_CHECKING: from .inode import Directory @@ -24,12 +29,12 @@ class DirectoryEntryStruct(Ext4Struct): - def __init__(self, directory: "Directory", offset: int): - self.directory: "Directory" = directory + def __init__(self, directory: "Directory", offset: int) -> None: + self.directory: Directory = directory super().__init__(directory.volume, offset) @override - def read_from_volume(self): + def read_from_volume(self) -> None: data = self.directory._open().read()[self.offset : self.offset + self.size] # pyright: ignore[reportPrivateUsage] _ = memmove(addressof(self), data, self.size) diff --git a/ext4/enum.py b/ext4/enum.py index 77f5057..aa11e24 100644 --- a/ext4/enum.py +++ b/ext4/enum.py @@ -1,12 +1,15 @@ # pyright: reportImportCycles=false -from ctypes import c_uint8 -from ctypes import c_uint16 -from ctypes import c_uint32 - -from typing import cast -from typing import Any -from typing import final -from typing import TYPE_CHECKING +from ctypes import ( + c_uint8, + c_uint16, + c_uint32, +) +from typing import ( + TYPE_CHECKING, + Any, + cast, + final, +) from ._compat import override @@ -14,9 +17,9 @@ from .struct import SimpleCData -def TypedEnumerationType(_type: type["SimpleCData"]): - class EnumerationType(type(_type)): # type: ignore # pyright: ignore[reportGeneralTypeIssues, reportUntypedBaseClass] - def __new__(cls, name: str, bases: tuple[type, ...], data: dict[str, Any]): # pyright: ignore[reportExplicitAny, reportUnknownParameterType] +def TypedEnumerationType(_type: type["SimpleCData"]): # noqa: ANN201 + class EnumerationType(type(_type)): # type: ignore # pyright: ignore[reportGeneralTypeIssues, reportUntypedBaseClass] #noqa: ANN201 + def __new__(cls, name: str, bases: tuple[type, ...], data: dict[str, Any]): # pyright: ignore[reportExplicitAny, reportUnknownParameterType] # noqa: ANN204 _members_: dict[str, Any] # pyright: ignore[reportExplicitAny] if "_members_" not in data: _members_ = {} @@ -30,25 +33,25 @@ def __new__(cls, name: str, bases: tuple[type, ...], data: dict[str, Any]): # p _members_ = cast(dict[str, Any], data["_members_"]) # pyright: ignore[reportExplicitAny] data["_reverse_map_"] = {v: k for k, v in _members_.items()} # pyright: ignore[reportAny] - cls = type(_type).__new__(cls, name, bases, data) # pyright: ignore[reportCallIssue, reportUnknownVariableType] + cls = type(_type).__new__(cls, name, bases, data) # pyright: ignore[reportCallIssue, reportUnknownVariableType] # noqa: PLW0642 for key, value in cast(dict[str, Any], cls._members_).items(): # pyright: ignore[reportExplicitAny, reportAny] globals()[key] = value return cls # pyright: ignore[reportUnknownVariableType] @override - def __repr__(self): + def __repr__(self) -> str: return f"" # pyright: ignore[reportUnknownMemberType] return EnumerationType -def TypedCEnumeration(_type: type["SimpleCData"]): - class CEnumeration(_type, metaclass=TypedEnumerationType(_type)): # pyright: ignore[reportGeneralTypeIssues, reportUntypedBaseClass] +def TypedCEnumeration(_type: type["SimpleCData"]): # noqa: ANN201 + class CEnumeration(_type, metaclass=TypedEnumerationType(_type)): # pyright: ignore[reportGeneralTypeIssues, reportUntypedBaseClass] # noqa: ANN201,PLW1641,PLW1641 _members_: dict[str, Any] = {} # pyright: ignore[reportExplicitAny] @override - def __repr__(self): + def __repr__(self) -> str: value = self.value # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] return f"<{self.__class__.__name__}.{self._reverse_map_.get(value, '(unknown)')}: {value}>" # pyright: ignore[reportUnknownMemberType] diff --git a/ext4/extent.py b/ext4/extent.py index 0accccf..397d5eb 100644 --- a/ext4/extent.py +++ b/ext4/extent.py @@ -1,24 +1,29 @@ # pyright: reportImportCycles=false -from ctypes import c_uint32 -from ctypes import c_uint16 -from ctypes import sizeof - -from typing import final -from typing import TYPE_CHECKING - -from .struct import crc32c -from .struct import Ext4Struct +from collections.abc import Iterator +from ctypes import ( + c_uint16, + c_uint32, + sizeof, +) +from typing import ( + TYPE_CHECKING, + final, +) from ._compat import assert_cast +from .struct import ( + Ext4Struct, + crc32c, +) if TYPE_CHECKING: from .inode import Inode from .volume import Volume -class ExtentBlocks(object): - def __init__(self, extent: "Extent"): - self.extent: "Extent" = extent +class ExtentBlocks: + def __init__(self, extent: "Extent") -> None: + self.extent: Extent = extent self._null_block: bytearray = bytearray(self.block_size) @property @@ -51,7 +56,7 @@ def is_initialized(self) -> bool: def __contains__(self, ee_block: int) -> bool: return self.ee_block <= ee_block < self.ee_block + self.ee_len - def __getitem__(self, ee_block: int): + def __getitem__(self, ee_block: int) -> bytearray | bytes: block_size = self.block_size if not self.is_initialized or ee_block not in self: # Uninitialized @@ -61,10 +66,10 @@ def __getitem__(self, ee_block: int): _ = self.volume.seek(disk_block * block_size) return self.volume.read(block_size) - def __iter__(self): + def __iter__(self) -> Iterator[int]: return iter(range(self.ee_block, self.ee_block + self.ee_len)) - def __len__(self): + def __len__(self) -> int: return self.ee_len @@ -80,7 +85,7 @@ class ExtentHeader(Ext4Struct): ("eh_generation", c_uint32), ] - def __init__(self, tree: "ExtentTree", offset: int): + def __init__(self, tree: "ExtentTree", offset: int) -> None: self.tree: ExtentTree = tree super().__init__(self.inode.volume, offset) @@ -113,7 +118,7 @@ def inode(self) -> "Inode": return self.tree.inode @Ext4Struct.expected_magic.getter - def expected_magic(self): + def expected_magic(self) -> int: return 0xF30A @Ext4Struct.magic.getter @@ -133,7 +138,7 @@ def expected_checksum(self) -> int | None: return et_checksum @property - def seed(self): + def seed(self) -> int: return self.inode.seed @Ext4Struct.checksum.getter @@ -158,7 +163,7 @@ class ExtentIndex(Ext4Struct): ("ei_unused", c_uint16), ] - def __init__(self, header: ExtentHeader, offset: int, ei_no: int): + def __init__(self, header: ExtentHeader, offset: int, ei_no: int) -> None: self.ei_no: int = ei_no self.header: ExtentHeader = header super().__init__(self.inode.volume, offset) @@ -170,11 +175,11 @@ def ei_leaf(self) -> int: return ei_leaf_hi << 32 | ei_leaf_lo @property - def tree(self): + def tree(self) -> "ExtentTree": return self.header.tree @property - def inode(self): + def inode(self) -> "Inode": return self.tree.inode @@ -189,7 +194,7 @@ class Extent(Ext4Struct): ("ee_start_lo", c_uint32), ] - def __init__(self, header: ExtentHeader, offset: int, ee_no: int): + def __init__(self, header: ExtentHeader, offset: int, ee_no: int) -> None: super().__init__(header.inode.volume, offset) self.ee_no: int = ee_no self.header: ExtentHeader = header @@ -232,7 +237,7 @@ class ExtentTail(Ext4Struct): ("et_checksum", c_uint32), ] - def __init__(self, header: ExtentHeader, offset: int): + def __init__(self, header: ExtentHeader, offset: int) -> None: self.header: ExtentHeader = header super().__init__(self.inode.volume, offset) @@ -245,9 +250,9 @@ def inode(self) -> "Inode": return self.tree.inode -class ExtentTree(object): - def __init__(self, inode: "Inode"): - self.inode: "Inode" = inode +class ExtentTree: + def __init__(self, inode: "Inode") -> None: + self.inode: Inode = inode self.headers: list[ExtentHeader] = [] if not self.has_extents: return diff --git a/ext4/htree.py b/ext4/htree.py index 92a1310..d418d95 100644 --- a/ext4/htree.py +++ b/ext4/htree.py @@ -1,25 +1,30 @@ # pyright: reportImportCycles=false import warnings - -from ctypes import c_uint32 -from ctypes import c_uint16 -from ctypes import c_uint8 -from ctypes import c_char -from ctypes import sizeof -from ctypes import addressof -from ctypes import memmove -from ctypes import LittleEndianStructure - -from typing import final -from typing import TYPE_CHECKING - from collections.abc import Generator - -from .struct import Ext4Struct -from .struct import MagicError +from ctypes import ( + LittleEndianStructure, + addressof, + c_char, + c_uint8, + c_uint16, + c_uint32, + memmove, + sizeof, +) +from typing import ( + TYPE_CHECKING, + final, +) + +from ._compat import ( + assert_cast, + override, +) from .enum import DX_HASH -from ._compat import override -from ._compat import assert_cast +from .struct import ( + Ext4Struct, + MagicError, +) if TYPE_CHECKING: from .inode import Directory @@ -27,9 +32,9 @@ class LittleEndianStructureWithVolume(LittleEndianStructure): - def __init__(self): + def __init__(self) -> None: super().__init__() - self._volume: "Volume | None" = None + self._volume: Volume | None = None @property def volume(self) -> "Volume": @@ -84,12 +89,12 @@ class DXRootInfo(LittleEndianStructure): class DXBase(Ext4Struct): - def __init__(self, directory: "Directory", offset: int): - self.directory: "Directory" = directory + def __init__(self, directory: "Directory", offset: int) -> None: + self.directory: Directory = directory super().__init__(directory.volume, offset) @override - def read_from_volume(self): + def read_from_volume(self) -> None: reader = self.directory._open() # pyright: ignore[reportPrivateUsage] _ = reader.seek(self.offset) data = reader.read(sizeof(self)) @@ -105,7 +110,7 @@ class DXEntry(DXBase): ("block", c_uint32), ] - def __init__(self, parent: "DXEntriesBase", index: int): + def __init__(self, parent: "DXEntriesBase", index: int) -> None: self.index: int = index self.parent: DXEntriesBase = parent super().__init__( @@ -116,7 +121,7 @@ def __init__(self, parent: "DXEntriesBase", index: int): class DXEntriesBase(DXBase): @override - def read_from_volume(self): + def read_from_volume(self) -> None: super().read_from_volume() @property @@ -149,7 +154,7 @@ class DXRoot(DXEntriesBase): # ("entries", DXEntry * self.count), ] - def __init__(self, inode: "Directory"): + def __init__(self, inode: "Directory") -> None: super().__init__(inode, 0) @@ -163,7 +168,7 @@ class DXFake(LittleEndianStructure): ] @property - def expected_magic(self): + def expected_magic(self) -> int: return 0 @property @@ -186,7 +191,7 @@ class DXNode(DXEntriesBase): # ("entries", DXEntry * self.count), ] - def __init__(self, directory: "Directory", offset: int): + def __init__(self, directory: "Directory", offset: int) -> None: super().__init__(directory, offset) @@ -199,7 +204,7 @@ class DXTail(DXBase): ("dt_checksum", c_uint16), ] - def __init__(self, parent: DXNode): + def __init__(self, parent: DXNode) -> None: self.parent = parent count = assert_cast(parent.count, int) # pyright: ignore[reportAny] super().__init__( diff --git a/ext4/inode.py b/ext4/inode.py index 8f06217..ab2d52a 100644 --- a/ext4/inode.py +++ b/ext4/inode.py @@ -1,57 +1,66 @@ # pyright: reportImportCycles=false from __future__ import annotations +import errno import io import os -import errno import warnings - -from ctypes import LittleEndianStructure -from ctypes import Union -from ctypes import c_uint32 -from ctypes import c_uint16 -from ctypes import sizeof - -from typing import cast -from typing import final -from typing import Any -from typing import TYPE_CHECKING - -from cachetools import cachedmethod -from cachetools import LRUCache - -from ._compat import override -from ._compat import ReadableStream -from ._compat import assert_cast - from collections.abc import Generator - -from .struct import crc32c -from .struct import Ext4Struct -from .struct import MagicError - -from .enum import EXT4_OS -from .enum import EXT4_FL -from .enum import EXT4_FEATURE_INCOMPAT -from .enum import MODE -from .enum import EXT4_FT - -from .extent import Extent -from .extent import ExtentHeader -from .extent import ExtentIndex -from .extent import ExtentTree - +from ctypes import ( + LittleEndianStructure, + Union, + c_uint16, + c_uint32, + sizeof, +) +from typing import ( + TYPE_CHECKING, + Any, + cast, + final, +) + +from cachetools import ( + LRUCache, + cachedmethod, +) + +from ._compat import ( + ReadableStream, + assert_cast, + override, +) from .block import BlockIO - -from .directory import DirectoryEntry -from .directory import DirectoryEntry2 -from .directory import DirectoryEntryHash -from .directory import EXT4_DIR_ROUND - +from .directory import ( + EXT4_DIR_ROUND, + DirectoryEntry, + DirectoryEntry2, + DirectoryEntryHash, +) +from .enum import ( + EXT4_FEATURE_INCOMPAT, + EXT4_FL, + EXT4_FT, + EXT4_OS, + MODE, +) +from .extent import ( + Extent, + ExtentHeader, + ExtentIndex, + ExtentTree, +) from .htree import DXRoot - -from .xattr import ExtendedAttributeIBodyHeader -from .xattr import ExtendedAttributeHeader +from .struct import ( + Ext4Struct, + MagicError, + crc32c, +) +from .superblock import Superblock +from .xattr import ( + ExtendedAttributeHeader, + ExtendedAttributeIBodyHeader, +) if TYPE_CHECKING: from .volume import Volume @@ -182,42 +191,75 @@ class Inode(Ext4Struct): ("i_projid", c_uint32), ] - def __new__(cls, volume: "Volume", offset: int, i_no: int): - if cls is not Inode: - return super().__new__(cls) - + @classmethod + def get_file_type(cls, volume: Volume, offset: int) -> EXT4_FT: _ = volume.seek(offset + Inode.i_mode.offset) - file_type: MODE = cast( + file_type = cast( MODE, Inode.field_type("i_mode").from_buffer_copy( # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType, reportOptionalMemberAccess] volume.read(Inode.i_mode.size) ) & 0xF000, ) - if file_type == MODE.IFIFO: - return super().__new__(Fifo) # pyright: ignore[reportArgumentType] + match file_type: + case MODE.IFIFO: + return EXT4_FT.FIFO # pyright: ignore[reportReturnType] - if file_type == MODE.IFDIR: - return super().__new__(Directory) # pyright: ignore[reportArgumentType] + case MODE.IFCHR: + return EXT4_FT.CHRDEV # pyright: ignore[reportReturnType] - if file_type == MODE.IFREG: - return super().__new__(File) # pyright: ignore[reportArgumentType] + case MODE.IFDIR: + return EXT4_FT.DIR # pyright: ignore[reportReturnType] - if file_type == MODE.IFLNK: - return super().__new__(SymbolicLink) # pyright: ignore[reportArgumentType] + case MODE.IFBLK: + return EXT4_FT.BLKDEV # pyright: ignore[reportReturnType] - if file_type == MODE.IFCHR: - return super().__new__(CharacterDevice) # pyright: ignore[reportArgumentType] + case MODE.IFREG: + return EXT4_FT.REG_FILE # pyright: ignore[reportReturnType] + + case MODE.IFLNK: + return EXT4_FT.SYMLINK # pyright: ignore[reportReturnType] + + case MODE.IFSOCK: + return EXT4_FT.SOCK # pyright: ignore[reportReturnType] + + case _: + return EXT4_FT.UNKNOWN # pyright: ignore[reportReturnType] + + def __new__(cls, volume: Volume, offset: int, i_no: int) -> Inode: + if cls is not Inode: + return super().__new__(cls) - if file_type == MODE.IFBLK: - return super().__new__(BlockDevice) # pyright: ignore[reportArgumentType] + file_type = cls.get_file_type(volume, offset) + match file_type: + case EXT4_FT.FIFO: + return super().__new__(Fifo) # pyright: ignore[reportArgumentType] - if file_type == MODE.IFSOCK: - return super().__new__(Socket) # pyright: ignore[reportArgumentType] + case EXT4_FT.DIR: + return super().__new__(Directory) # pyright: ignore[reportArgumentType] - raise InodeError(f"Unknown file type 0x{file_type:X}") + case EXT4_FT.REG_FILE: + return super().__new__(File) # pyright: ignore[reportArgumentType] - def __init__(self, volume: "Volume", offset: int, i_no: int): + case EXT4_FT.SYMLINK: + return super().__new__(SymbolicLink) # pyright: ignore[reportArgumentType] + + case EXT4_FT.CHRDEV: + return super().__new__(CharacterDevice) # pyright: ignore[reportArgumentType] + + case EXT4_FT.BLKDEV: + return super().__new__(BlockDevice) # pyright: ignore[reportArgumentType] + + case EXT4_FT.SOCK: + return super().__new__(Socket) # pyright: ignore[reportArgumentType] + + case EXT4_FT.UNKNOWN: + return super().__new__(UnknownInode) # pyright: ignore[reportArgumentType] + + case _: + raise InodeError(f"Unknown file type 0x{file_type:X}") + + def __init__(self, volume: Volume, offset: int, i_no: int) -> None: self.i_no: int = i_no super().__init__(volume, offset) self.tree: ExtentTree | None = ExtentTree(self) @@ -233,11 +275,11 @@ def extra_inode_data(self) -> bytes: return self.volume.read(self.superblock.s_inode_size - size) # pyright: ignore[reportAny] @property - def superblock(self): + def superblock(self) -> Superblock: return self.volume.superblock @property - def block_size(self): + def block_size(self) -> int: return self.volume.block_size @property @@ -411,6 +453,10 @@ def xattrs( raise +class UnknownInode(Inode): + pass + + class Fifo(Inode): pass @@ -436,12 +482,12 @@ def open( class SymbolicLink(Inode): - def readlink(self): + def readlink(self) -> bytes: return self._open().read() class Directory(Inode): - def __init__(self, volume: "Volume", offset: int, i_no: int): + def __init__(self, volume: Volume, offset: int, i_no: int) -> None: super().__init__(volume, offset, i_no) self._inode_at_cache: LRUCache[str | bytes, Inode] = LRUCache(maxsize=32) self._dirents: None | list[DirectoryEntry | DirectoryEntry2] = None @@ -450,12 +496,12 @@ def __init__(self, volume: "Volume", offset: int, i_no: int): self.htree = DXRoot(self) @override - def verify(self): + def verify(self) -> None: super().verify() # TODO verify DirectoryEntryHash? Or should this be in validate? @override - def validate(self): + def validate(self) -> None: super().validate() # TODO validate each directory entry block with DirectoryEntryTail @@ -528,41 +574,19 @@ def _opendir( self._dirents = dirents + def _is_valid_file_type(self, file_type: EXT4_FT) -> bool: + return file_type != EXT4_FT.UNKNOWN and file_type < EXT4_FT.MAX + def _get_file_type(self, dirent: DirectoryEntry | DirectoryEntry2) -> EXT4_FT: dirent_inode = assert_cast(dirent.inode, int) # pyright: ignore[reportAny] offset = self.volume.inodes.offset(dirent_inode) - _ = self.volume.seek(offset + Inode.i_mode.offset) - file_type = cast( - MODE, - Inode.field_type("i_mode").from_buffer_copy( # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType, reportOptionalMemberAccess] - self.volume.read(Inode.i_mode.size) + file_type = self.get_file_type(self.volume, offset) + if not self._is_valid_file_type(file_type): + raise OpenDirectoryError( + f"Unexpected file type {file_type} for inode {dirent_inode}" ) - & 0xF000, - ) - if file_type == MODE.IFIFO: - return EXT4_FT.FIFO # pyright: ignore[reportReturnType] - - if file_type == MODE.IFCHR: - return EXT4_FT.CHRDEV # pyright: ignore[reportReturnType] - - if file_type == MODE.IFDIR: - return EXT4_FT.DIR # pyright: ignore[reportReturnType] - if file_type == MODE.IFBLK: - return EXT4_FT.BLKDEV # pyright: ignore[reportReturnType] - - if file_type == MODE.IFREG: - return EXT4_FT.REG_FILE # pyright: ignore[reportReturnType] - - if file_type == MODE.IFLNK: - return EXT4_FT.SYMLINK # pyright: ignore[reportReturnType] - - if file_type == MODE.IFSOCK: - return EXT4_FT.SOCK # pyright: ignore[reportReturnType] - - raise OpenDirectoryError( - f"Unexpected file type {file_type} for inode {dirent_inode}" - ) + return file_type def opendir( self, @@ -573,7 +597,7 @@ def opendir( if file_type == EXT4_FT.DIR_CSUM: continue - if file_type == EXT4_FT.UNKNOWN or file_type > EXT4_FT.MAX: + if not self._is_valid_file_type(file_type): raise OpenDirectoryError(f"Unexpected file type: {file_type}") else: diff --git a/ext4/struct.py b/ext4/struct.py index 326a10b..92e4883 100644 --- a/ext4/struct.py +++ b/ext4/struct.py @@ -1,15 +1,22 @@ # pyright: reportImportCycles=false -import warnings import ctypes +import errno +import warnings +from collections.abc import Callable +from ctypes import ( + LittleEndianStructure, + addressof, + memmove, + sizeof, +) +from typing import ( + TYPE_CHECKING, + cast, +) -from ctypes import LittleEndianStructure -from ctypes import memmove -from ctypes import addressof -from ctypes import sizeof -from crcmod import mkCrcFun # pyright: ignore[reportMissingTypeStubs, reportUnknownVariableType] -from typing import cast -from typing import Callable -from typing import TYPE_CHECKING +from crcmod import ( + mkCrcFun, # pyright: ignore[reportUnknownVariableType] +) if TYPE_CHECKING: from .volume import Volume @@ -68,9 +75,9 @@ def to_hex(data: int | list[int] | bytes | None) -> str: class Ext4Struct(LittleEndianStructure): - def __init__(self, volume: "Volume", offset: int): + def __init__(self, volume: "Volume", offset: int) -> None: super().__init__() - self.volume: "Volume" = volume + self.volume: Volume = volume self.offset: int = offset self.read_from_volume() self.verify() @@ -85,41 +92,42 @@ def field_type(cls, name: str) -> SimpleCData | None: return None - def read_from_volume(self): + def read_from_volume(self) -> None: _ = self.volume.seek(self.offset) data = self.volume.read(sizeof(self)) if len(data) != sizeof(self): raise OSError( - f"Short read for {type(self).__name__} at offset {self.offset}" + errno.EIO, + f"Short read for {type(self).__name__} at offset {self.offset}", ) _ = memmove(addressof(self), data, sizeof(self)) @property - def size(self): + def size(self) -> int: return sizeof(self) @property - def magic(self): + def magic(self) -> int | None: return None @property - def expected_magic(self) -> None: + def expected_magic(self) -> int | None: return None @property - def checksum(self): + def checksum(self) -> int | None: return None @property - def expected_checksum(self): + def expected_checksum(self) -> int | None: return None @property - def ignore_magic(self): + def ignore_magic(self) -> bool: return self.volume.ignore_magic - def verify(self): + def verify(self) -> None: """ Verify magic numbers """ @@ -136,7 +144,7 @@ def verify(self): warnings.warn(message, RuntimeWarning) - def validate(self): + def validate(self) -> None: """ Validate data checksums """ diff --git a/ext4/superblock.py b/ext4/superblock.py index f90dbf0..635c54e 100644 --- a/ext4/superblock.py +++ b/ext4/superblock.py @@ -1,31 +1,36 @@ # pyright: reportImportCycles=false -from ctypes import c_uint64 -from ctypes import c_uint32 -from ctypes import c_uint16 -from ctypes import c_uint8 -from ctypes import c_ubyte - -from typing import final -from typing import TYPE_CHECKING - -from .enum import EXT4_FS -from .enum import EXT4_ERRORS -from .enum import EXT4_OS -from .enum import EXT4_REV -from .enum import EXT4_FEATURE_COMPAT -from .enum import EXT4_FEATURE_INCOMPAT -from .enum import EXT4_FEATURE_RO_COMPAT -from .enum import DX_HASH -from .enum import EXT4_DEFM -from .enum import EXT2_FLAGS -from .enum import EXT4_CHKSUM -from .enum import EXT4_MOUNT -from .enum import FS_ENCRYPTION_MODE - -from .struct import Ext4Struct -from .struct import crc32c +from ctypes import ( + c_ubyte, + c_uint8, + c_uint16, + c_uint32, + c_uint64, +) +from typing import ( + TYPE_CHECKING, + final, +) from ._compat import assert_cast +from .enum import ( + DX_HASH, + EXT2_FLAGS, + EXT4_CHKSUM, + EXT4_DEFM, + EXT4_ERRORS, + EXT4_FEATURE_COMPAT, + EXT4_FEATURE_INCOMPAT, + EXT4_FEATURE_RO_COMPAT, + EXT4_FS, + EXT4_MOUNT, + EXT4_OS, + EXT4_REV, + FS_ENCRYPTION_MODE, +) +from .struct import ( + Ext4Struct, + crc32c, +) if TYPE_CHECKING: from .volume import Volume @@ -132,7 +137,7 @@ class Superblock(Ext4Struct): ("s_checksum", c_uint32), ] - def __init__(self, volume: "Volume", _=None): + def __init__(self, volume: "Volume", _=None) -> None: super().__init__(volume, 0x400) @property diff --git a/ext4/volume.py b/ext4/volume.py index 39dda12..908958a 100644 --- a/ext4/volume.py +++ b/ext4/volume.py @@ -1,30 +1,35 @@ from __future__ import annotations +import errno import io import os -import errno - -from uuid import UUID from pathlib import PurePosixPath +from uuid import UUID -from cachetools import cachedmethod -from cachetools import LRUCache +from cachetools import ( + LRUCache, + cachedmethod, +) -from ._compat import PeekableStream -from ._compat import assert_cast +from ._compat import ( + PeekableStream, + assert_cast, +) +from .blockdescriptor import BlockDescriptor from .enum import EXT4_INO +from .inode import ( + Directory, + Inode, +) from .superblock import Superblock -from .inode import Inode -from .inode import Directory -from .blockdescriptor import BlockDescriptor class InvalidStreamException(Exception): pass -class Inodes(object): - def __init__(self, volume: "Volume"): +class Inodes: + def __init__(self, volume: Volume) -> None: self.volume: Volume = volume self._group_cache: dict[int, tuple[int, int]] = {} self._offset_cache: LRUCache[int, int] = LRUCache(maxsize=32) @@ -55,12 +60,12 @@ def offset(self, index: int) -> int: return table_offset + table_entry_index * s_inode_size @cachedmethod(lambda self: self._getitem_cache) # pyright: ignore[reportAny] - def __getitem__(self, index: int): + def __getitem__(self, index: int) -> Inode: offset = self.offset(index) return Inode(self.volume, offset, index) -class Volume(object): +class Volume: def __init__( self, stream: PeekableStream, @@ -69,7 +74,7 @@ def __init__( ignore_magic: bool = False, ignore_checksum: bool = False, ignore_attr_name_index: bool = False, - ): + ) -> None: errors: list[str] = [] for name in ("read", "peek", "tell", "seek"): if not hasattr(stream, name): @@ -110,12 +115,12 @@ def __init__( self.inodes: Inodes = Inodes(self) self._inode_at_cache: LRUCache[str | bytes, Inode] = LRUCache(maxsize=32) - def __len__(self): + def __len__(self) -> int: _ = self.stream.seek(0, io.SEEK_END) return self.stream.tell() - self.offset @property - def bad_blocks(self): + def bad_blocks(self) -> Inode: return self.inodes[EXT4_INO.BAD] @property @@ -123,23 +128,23 @@ def root(self) -> Directory: return assert_cast(self.inodes[EXT4_INO.ROOT], Directory) @property - def user_quota(self): + def user_quota(self) -> Inode: return self.inodes[EXT4_INO.USR_QUOTA] @property - def group_quota(self): + def group_quota(self) -> Inode: return self.inodes[EXT4_INO.GRP_QUOTA] @property - def boot_loader(self): + def boot_loader(self) -> Inode: return self.inodes[EXT4_INO.BOOT_LOADER] @property - def undelete_directory(self): + def undelete_directory(self) -> Inode: return self.inodes[EXT4_INO.UNDEL_DIR] @property - def journal(self): + def journal(self) -> Inode: return self.inodes[EXT4_INO.JOURNAL] @property @@ -147,12 +152,12 @@ def has_hi(self) -> int: return self.superblock.has_hi @property - def uuid(self): + def uuid(self) -> UUID: s_uuid = assert_cast(bytes(self.superblock.s_uuid), bytes) # pyright: ignore[reportAny] return UUID(bytes=s_uuid) @property - def seed(self): + def seed(self) -> int: return self.superblock.seed @property @@ -197,7 +202,7 @@ def peek(self, size: int) -> bytes: def tell(self) -> int: return self.cursor - def block_read(self, index: int, count: int = 1): + def block_read(self, index: int, count: int = 1) -> bytes: assert index >= 0 assert count > 0 block_size = self.block_size # Only calculate once diff --git a/ext4/xattr.py b/ext4/xattr.py index 79c6ddd..e867484 100644 --- a/ext4/xattr.py +++ b/ext4/xattr.py @@ -1,22 +1,28 @@ import warnings - -from ctypes import c_uint32 -from ctypes import c_uint16 -from ctypes import c_uint8 -from ctypes import sizeof - -from typing import final -from typing import TYPE_CHECKING - from collections.abc import Generator - -from .enum import EXT4_FL -from .enum import EXT4_FEATURE_INCOMPAT - -from .struct import Ext4Struct -from .struct import crc32c - -from ._compat import assert_cast, override +from ctypes import ( + c_uint8, + c_uint16, + c_uint32, + sizeof, +) +from typing import ( + TYPE_CHECKING, + final, +) + +from ._compat import ( + assert_cast, + override, +) +from .enum import ( + EXT4_FEATURE_INCOMPAT, + EXT4_FL, +) +from .struct import ( + Ext4Struct, + crc32c, +) if TYPE_CHECKING: from .inode import Inode @@ -27,8 +33,8 @@ class ExtendedAttributeError(Exception): class ExtendedAttributeBase(Ext4Struct): - def __init__(self, inode: "Inode", offset: int, size: int): - self.inode: "Inode" = inode + def __init__(self, inode: "Inode", offset: int, size: int) -> None: + self.inode: Inode = inode self.data_size: int = size super().__init__(inode.volume, offset) @@ -41,7 +47,7 @@ class ExtendedAttributeIBodyHeader(ExtendedAttributeBase): ] @ExtendedAttributeBase.ignore_magic.getter - def ignore_magic(self): + def ignore_magic(self) -> bool: return False @ExtendedAttributeBase.magic.getter @@ -50,7 +56,7 @@ def magic(self) -> int: return h_magic @ExtendedAttributeBase.expected_magic.getter - def expected_magic(self): + def expected_magic(self) -> int: return 0xEA020000 def value_offset(self, entry: "ExtendedAttributeEntry") -> int: @@ -111,7 +117,7 @@ class ExtendedAttributeHeader(ExtendedAttributeIBodyHeader): ] @override - def verify(self): + def verify(self) -> None: super().verify() h_blocks = assert_cast(self.h_blocks, int) # pyright: ignore[reportAny] if h_blocks != 1: @@ -126,7 +132,7 @@ def value_offset(self, entry: "ExtendedAttributeEntry") -> int: return self.offset + e_value_offs @ExtendedAttributeIBodyHeader.expected_checksum.getter - def expected_checksum(self): + def expected_checksum(self) -> int | None: h_checksum = assert_cast(self.h_checksum, int) # pyright: ignore[reportAny] if not h_checksum: return None @@ -134,7 +140,7 @@ def expected_checksum(self): return h_checksum @ExtendedAttributeIBodyHeader.checksum.getter - def checksum(self): + def checksum(self) -> int | None: h_checksum = assert_cast(self.h_checksum, int) # pyright: ignore[reportAny] if not h_checksum: return None @@ -172,7 +178,7 @@ class ExtendedAttributeEntry(ExtendedAttributeBase): ] @override - def read_from_volume(self): + def read_from_volume(self) -> None: super().read_from_volume() e_name_len = assert_cast(self.e_name_len, int) # pyright: ignore[reportAny] self.e_name: bytes = self.volume.stream.read(e_name_len) # pyright: ignore[reportUninitializedInstanceVariable] diff --git a/fuzz.py b/fuzz.py new file mode 100644 index 0000000..dd61da3 --- /dev/null +++ b/fuzz.py @@ -0,0 +1,242 @@ +import os +import random +import string +import subprocess +import sys +import tempfile +import warnings +from collections.abc import Callable +from typing import ( + TYPE_CHECKING, + Any, + cast, +) + +import atheris + +warnings.filterwarnings("ignore") + +EXPECTED_DATA_SIZE = 145 + + +seed_file = os.path.join("corpus", "seed", "seed.bin") +if not os.path.exists(seed_file) or os.stat(seed_file).st_size != EXPECTED_DATA_SIZE: + os.makedirs(os.path.dirname(seed_file), exist_ok=True) + with open(seed_file, "wb") as f: + _ = f.write(b"\x00" * EXPECTED_DATA_SIZE) + +with atheris.instrument_imports(): # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue] + from ext4 import ( + ChecksumError, + Directory, + File, + SymbolicLink, + Volume, + ) + + +def TestOneInput(data: bytes) -> None: + fdp = atheris.FuzzedDataProvider(data) # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownVariableType] + + if TYPE_CHECKING: + fdp.ConsumeIntInRange = cast(Callable[[int, int], int], fdp.ConsumeIntInRange) + fdp.ConsumeInt = cast(Callable[[int], int], fdp.ConsumeInt) + fdp.PickValueInList = cast(Callable[[list[Any]], int], fdp.PickValueInList) # pyright: ignore[reportExplicitAny] + + img_size: int = fdp.ConsumeIntInRange(32, 64) + block_size: int = [1024, 2048, 4096][fdp.ConsumeIntInRange(0, 2)] + inode_size: int = [128, 256][fdp.ConsumeIntInRange(0, 1)] + num_dirs: int = fdp.ConsumeIntInRange(2, 20) + num_files: int = fdp.ConsumeIntInRange(5, 50) + num_symlinks: int = fdp.ConsumeIntInRange(0, 10) + num_hardlinks: int = fdp.ConsumeIntInRange(0, 5) + num_xattr_files: int = fdp.ConsumeIntInRange(0, 10) + max_file_size: int = fdp.ConsumeIntInRange(1, 64) + rng_seed: int = fdp.ConsumeInt(1024) + rng = random.Random(rng_seed) # noqa: S311 + + FEATURES = [ + "extent", + "dir_index", + "flex_bg", + "sparse_super", + "64bit", + "metadata_csum", + "huge_file", + "orphan_file", + ] + features = [f for f in FEATURES if fdp.PickValueInList([True, False])] + + with tempfile.TemporaryDirectory(prefix="ext4_fuzz_") as tmpdir: + rootdir = os.path.join(tmpdir, "root") + os.mkdir(rootdir) + dirs: list[str] = [rootdir] + for _ in range(num_dirs): + parent = rng.choice(dirs) + name = "".join( + rng.choice(string.ascii_letters + string.digits) + for _ in range(rng.randint(1, 32)) + ) + path = os.path.join(parent, name) + os.mkdir(path) + dirs.append(path) + + files: list[str] = [] + for _ in range(num_files): + parent = rng.choice(dirs) + name = "".join( + rng.choice(string.ascii_letters + string.digits) + for _ in range(rng.randint(1, 64)) + ) + path = os.path.join(parent, name) + size = rng.randint(1, max_file_size * 1024) + with open(path, "wb") as f: + _ = f.write(rng.randbytes(size)) + + files.append(path) + + targets = files + dirs + for _ in range(num_symlinks): + target = rng.choice(targets) + parent = rng.choice(dirs) + name = "".join( + rng.choice(string.ascii_letters + string.digits) + for _ in range(rng.randint(1, 32)) + ) + os.symlink(target, os.path.join(parent, name)) + + for _ in range(num_hardlinks): + if files: + target = rng.choice(files) + parent = rng.choice(dirs) + name = "".join( + rng.choice(string.ascii_letters + string.digits) + for _ in range(rng.randint(1, 32)) + ) + os.link(target, os.path.join(parent, name)) + + for _ in range(num_xattr_files): + if files: + path = rng.choice(files) + for _ in range(rng.randint(1, 5)): + key = f"user.xattr_{rng.randint(1, 10)}" + value = rng.randbytes(rng.randint(8, 64)) + os.setxattr(path, key, value) + + img_path = os.path.join(tmpdir, "image.img") + cmd = [ + "mkfs.ext4", + "-d", + rootdir, + "-I", + str(inode_size), + *(["-O", ",".join(features)] if features else []), + "-b", + str(block_size), + img_path, + f"{img_size}M", + ] + result = subprocess.run(cmd, check=False, capture_output=True) # noqa: S607,S603 + if result.returncode != 0: + raise subprocess.CalledProcessError( + result.returncode, cmd, result.stdout, result.stderr + ) + + try: + with open(img_path, "rb") as f: + volume = Volume( + f, + ignore_checksum=True, + ignore_flags=True, + ignore_magic=True, + ignore_attr_name_index=True, + ) + _ = volume.superblock + for group_descriptor in volume.group_descriptors: + _ = group_descriptor.bg_block_bitmap + + root = volume.root + htree = root.htree + if htree is not None: + for _ in htree.entries: + pass + + for dirent, _ in root.opendir(): + _ = dirent.name_bytes + + for inode in volume.inodes: + try: + inode.validate() + + except ChecksumError: + pass + + _ = inode.extra_inode_data + _ = inode.i_size + _ = inode.i_file_acl + if isinstance(inode, File): + _ = inode.open().read() + + elif isinstance(inode, SymbolicLink): + _ = inode.readlink() + + elif isinstance(inode, Directory): + for _ in inode.opendir(): + pass + + _ = inode.has_filetype + _ = inode.is_htree + _ = inode.is_casefolded + _ = inode.is_encrypted + _ = inode.hash_in_dirent + _ = inode.inode_at("/") + try: + _ = inode.inode_at("/empty") + + except FileNotFoundError: + pass + + for _, _ in inode.xattrs: + pass + + for extent in inode.extents: + _ = extent.is_initialized + _ = extent.len + _ = extent.read() + + for index in inode.indices: + _ = index.ei_leaf + + _ = volume.bad_blocks + _ = volume.boot_loader + _ = volume.journal + _ = volume.inode_at("/") + try: + _ = volume.inode_at("/empty") + + except FileNotFoundError: + pass + + finally: + if os.path.exists(img_path): + os.remove(img_path) + + +def custom_mutator(data: bytes, _max_size: int, _seed: int) -> bytes: + if len(data) >= EXPECTED_DATA_SIZE: + return data[:EXPECTED_DATA_SIZE] + + return data + b"\x00" * (EXPECTED_DATA_SIZE - len(data)) + + +argv = [ + sys.argv[0], + "corpus", + "-timeout=30", + f"-max_len={EXPECTED_DATA_SIZE}", + *sys.argv[1:], +] +print("argv: ", end="") +print(argv) +_ = atheris.Setup(argv, TestOneInput, custom_mutator=custom_mutator) +atheris.Fuzz() diff --git a/pyproject.toml b/pyproject.toml index 4483948..04fe251 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,26 @@ classifiers = [ "Topic :: System :: Filesystems", "Topic :: Utilities", ] -dynamic = ["dependencies", "readme"] +dynamic = ["readme"] +dependencies = [ + "cachetools==6.0.0", + "crcmod==1.7", + "typing-extensions==4.15.0; python_version<\"3.12\"", +] + +[project.optional-dependencies] +dev = [ + 'wheel', + 'build', + 'ruff', + 'basedpyright', +] +test = [ + "pytest", +] +fuzz = [ + "atheris", +] [project.urls] Homepage = "https://github.com/Eeems/python-ext4" @@ -37,9 +56,39 @@ packages = [ ] [tool.setuptools.dynamic] -dependencies = {file = ["requirements.txt"]} readme = {file= ["README.md"], content-type = "text/markdown"} [build-system] requires = ["setuptools >= 61.0"] build-backend = "setuptools.build_meta" + +[tool.ruff] +exclude = [".venv", "build"] + +[tool.ruff.lint] +extend-select = [ + "UP", + "PL", + "ANN", + "S", +] +ignore = [ + "PLW0603", + "PLR2004", + "PLR0915", + "PLR0912", + "PLR0911", + "PLR6301", + "PLR0913", + "S101", + "S404", + "S603", + "S607", + "ANN401", + "ANN001", + "ANN003", +] + +[tool.pyright] +exclude = [".venv", "build"] +reportMissingTypeStubs = false diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 0e67294..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -cachetools==6.0.0 -crcmod==1.7 diff --git a/test.py b/test.py index 16e4cb1..7f9011a 100644 --- a/test.py +++ b/test.py @@ -3,18 +3,20 @@ import os import sys import traceback -import ext4 - +from collections.abc import Callable from io import BufferedReader -from typing import cast -from typing import Callable -from typing import Any +from typing import ( + Any, + cast, +) + +import ext4 FAILED = False -def test_path_tuple(path: str | bytes, expected: tuple[bytes, ...]): - global FAILED +def test_path_tuple(path: str | bytes, expected: tuple[bytes, ...]) -> None: + global FAILED # noqa: PLW0603 print(f"check Volume.path_tuple({path}): ", end="") try: t = ext4.Volume.path_tuple(path) @@ -30,17 +32,17 @@ def test_path_tuple(path: str | bytes, expected: tuple[bytes, ...]): print(e) -def _eval_or_False(source: str) -> Any: # pyright: ignore[reportExplicitAny, reportAny] +def _eval_or_False(source: str) -> Any: # pyright: ignore[reportExplicitAny, reportAny] # noqa: ANN401 try: - return eval(source) # pyright: ignore[reportAny] + return eval(source) # pyright: ignore[reportAny] # noqa: S307 except Exception: traceback.print_exc() return False -def _assert(source: str, debug: Callable[[], Any] | None = None): # pyright: ignore[reportExplicitAny] - global FAILED +def _assert(source: str, debug: Callable[[], Any] | None = None) -> None: # pyright: ignore[reportExplicitAny] + global FAILED # noqa: PLW0603 print(f"check {source}: ", end="") if _eval_or_False(source): print("pass") @@ -52,8 +54,8 @@ def _assert(source: str, debug: Callable[[], Any] | None = None): # pyright: ig print(f" {debug()}") -def test_magic_error(f: BufferedReader): - global FAILED +def test_magic_error(f: BufferedReader) -> None: + global FAILED # noqa: PLW0603 try: print("check MagicError: ", end="") _ = ext4.Volume(f, offset=0) @@ -70,8 +72,8 @@ def test_magic_error(f: BufferedReader): print(e) -def test_root_inode(volume: ext4.Volume): - global FAILED +def test_root_inode(volume: ext4.Volume) -> None: + global FAILED # noqa: PLW0603 try: print("Validate root inode: ", end="") volume.root.validate() @@ -125,6 +127,10 @@ def test_root_inode(volume: ext4.Volume): traceback.print_exc() continue + _assert("volume.superblock is not None") + _assert("volume.bad_blocks is not None") + _assert("volume.boot_loader is not None") + _assert("volume.journal is not None") test_root_inode(volume) _assert('volume.root.inode_at("test.txt") == volume.inode_at("/test.txt")') _assert('volume.root.inode_at("/test.txt") == volume.inode_at("/test.txt")') @@ -168,6 +174,10 @@ def test_root_inode(volume: ext4.Volume): traceback.print_exc() if volume is not None: + _assert("volume.superblock is not None") + _assert("volume.bad_blocks is not None") + _assert("volume.boot_loader is not None") + _assert("volume.journal is not None") test_root_inode(volume) _assert("volume.root.is_htree == True") _assert("volume.root.htree is not None") diff --git a/test.sh b/test.sh index 1f75abf..036f36a 100755 --- a/test.sh +++ b/test.sh @@ -4,17 +4,15 @@ if ! [ -d .venv ]; then python -m venv .venv fi if [ -f .venv/Scripts/activate ]; then + make .venv/Scripts/activate source .venv/Scripts/activate elif [ -f .venv/bin/activate ]; then + make .venv/bin/activate source .venv/bin/activate else echo "venv missing" exit 1 fi -python -m pip install wheel -python -m pip install \ - --extra-index-url=https://wheels.eeems.codes/ \ - -r requirements.txt if [ ! -f test32.ext4 ] || [ ! -f test32.ext4.tmp ] || [ ! -f test64.ext4 ] || [ ! -f test64.ext4.tmp ] || [ ! -f test_htree.ext4 ]; then ./_test_image.sh trap "rm -f test{32,64,_htree}.ext4{,.tmp}" EXIT