diff --git a/dissect/ntfs/attr.py b/dissect/ntfs/attr.py index 5fda0be..2ca4d65 100644 --- a/dissect/ntfs/attr.py +++ b/dissect/ntfs/attr.py @@ -2,7 +2,7 @@ import io from datetime import datetime -from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional from dissect.util.stream import RangeStream, RunlistStream from dissect.util.ts import wintimestamp @@ -24,7 +24,7 @@ class Attribute: """Parse and interact with MFT attributes. - Wrapper for an AttributeHeader and AttributeRecord combination. + Wrapper for an :class:`AttributeHeader` and :class:`AttributeRecord` combination. Args: record: The MFT record this attribute belongs to. @@ -86,7 +86,7 @@ def name(self) -> str: """Return the name of this attribute.""" return self.header.name - def dataruns(self) -> List[Tuple[int, int]]: + def dataruns(self) -> list[tuple[int, int]]: """Return the dataruns of this attribute, if non-resident. Raises: @@ -194,7 +194,7 @@ def compression_unit(self) -> Optional[int]: """Return the compression unit if non-resident, else None.""" return self.header.Form.Nonresident.CompressionUnit if not self.resident else None - def dataruns(self) -> List[Tuple[int, int]]: + def dataruns(self) -> list[tuple[int, int]]: """Return the dataruns of this attribute, if non-resident. Raises: @@ -281,7 +281,7 @@ def from_fh( ) -> AttributeRecord: """Parse an attribute from a file-like object. - Selects a more specific AttributeRecord class if one is available for the given attribute type. + Selects a more specific :class:`AttributeRecord` class if one is available for the given attribute type. Args: fh: The file-like object to parse an attribute from. @@ -292,7 +292,7 @@ def from_fh( class AttributeList(AttributeRecord): - """Specific AttributeRecord parser for $ATTRIBUTE_LIST.""" + """Specific :class:`AttributeRecord` parser for ``$ATTRIBUTE_LIST``.""" __slots__ = ("entries",) @@ -320,7 +320,7 @@ def __repr__(self) -> str: return "<$ATTRIBUTE_LIST>" def attributes(self) -> Iterator[Attribute]: - """Iterate all entries within this $ATTRIBUTE_LIST and yield all embedded attributes.""" + """Iterate all entries within this ``$ATTRIBUTE_LIST`` and yield all embedded attributes.""" if not self.record: raise MftNotAvailableError("Can't iterate $ATTRIBUTE_LIST attributes without a bounded MFT record") @@ -339,7 +339,7 @@ def attributes(self) -> Iterator[Attribute]: class StandardInformation(AttributeRecord): - """Specific AttributeRecord parser for $STANDARD_INFORMATION.""" + """Specific :class:`AttributeRecord` parser for ``$STANDARD_INFORMATION``.""" __slots__ = ("attr",) @@ -354,62 +354,62 @@ def __repr__(self) -> str: @property def creation_time(self) -> datetime: - """Return the $STANDARD_INFORMATION CreationTime.""" + """Return the ``$STANDARD_INFORMATION`` ``CreationTime``.""" return wintimestamp(self.attr.CreationTime) @property def creation_time_ns(self) -> int: - """Return the $STANDARD_INFORMATION CreationTime in nanoseconds.""" + """Return the ``$STANDARD_INFORMATION`` ``CreationTime`` in nanoseconds.""" return ts_to_ns(self.attr.CreationTime) @property def last_modification_time(self) -> datetime: - """Return the $STANDARD_INFORMATION LastModificationTime.""" + """Return the ``$STANDARD_INFORMATION`` ``LastModificationTime``.""" return wintimestamp(self.attr.LastModificationTime) @property def last_modification_time_ns(self) -> int: - """Return the $STANDARD_INFORMATION LastModificationTime in nanoseconds.""" + """Return the ``$STANDARD_INFORMATION`` ``LastModificationTime`` in nanoseconds.""" return ts_to_ns(self.attr.LastModificationTime) @property def last_change_time(self) -> datetime: - """Return the $STANDARD_INFORMATION LastChangeTime.""" + """Return the ``$STANDARD_INFORMATION`` ``LastChangeTime``.""" return wintimestamp(self.attr.LastChangeTime) @property def last_change_time_ns(self) -> int: - """Return the $STANDARD_INFORMATION LastChangeTime in nanoseconds.""" + """Return the ``$STANDARD_INFORMATION`` ``LastChangeTime`` in nanoseconds.""" return ts_to_ns(self.attr.LastChangeTime) @property def last_access_time(self) -> datetime: - """Return the $STANDARD_INFORMATION LastAccessTime.""" + """Return the ``$STANDARD_INFORMATION`` ``LastAccessTime``.""" return wintimestamp(self.attr.LastAccessTime) @property def last_access_time_ns(self) -> int: - """Return the $STANDARD_INFORMATION LastAccessTime in nanoseconds.""" + """Return the ``$STANDARD_INFORMATION`` ``LastAccessTime`` in nanoseconds.""" return ts_to_ns(self.attr.LastAccessTime) @property def file_attributes(self) -> int: - """Return the $STANDARD_INFORMATION FileAttributes.""" + """Return the ``$STANDARD_INFORMATION`` ``FileAttributes``.""" return self.attr.FileAttributes @property def owner_id(self) -> int: - """Return the $STANDARD_INFORMATION OwnerId.""" + """Return the ``$STANDARD_INFORMATION`` ``OwnerId``.""" return self.attr.OwnerId @property def security_id(self) -> int: - """Return the $STANDARD_INFORMATION SecurityId.""" + """Return the ``$STANDARD_INFORMATION`` ``SecurityId``.""" return self.attr.SecurityId class FileName(AttributeRecord): - """Specific AttributeRecord parser for $FILE_NAME.""" + """Specific :class:`AttributeRecord` parser for ``$FILE_NAME``.""" __slots__ = ("attr",) @@ -423,62 +423,62 @@ def __repr__(self) -> str: @property def creation_time(self) -> datetime: - """Return the $FILE_NAME file CreationTime.""" + """Return the ``$FILE_NAME``file ``CreationTime``.""" return wintimestamp(self.attr.CreationTime) @property def creation_time_ns(self) -> int: - """Return the $FILE_NAME file CreationTime in nanoseconds.""" + """Return the ``$FILE_NAME`` file ``CreationTime`` in nanoseconds.""" return ts_to_ns(self.attr.CreationTime) @property def last_modification_time(self) -> datetime: - """Return the $FILE_NAME file LastModificationTime.""" + """Return the ``$FILE_NAME`` file ``LastModificationTime``.""" return wintimestamp(self.attr.LastModificationTime) @property def last_modification_time_ns(self) -> int: - """Return the $FILE_NAME file LastModificationTime in nanoseconds.""" + """Return the ``$FILE_NAME`` file ``LastModificationTime`` in nanoseconds.""" return ts_to_ns(self.attr.LastModificationTime) @property def last_change_time(self) -> datetime: - """Return the $FILE_NAME file LastChangeTime.""" + """Return the ``$FILE_NAME`` file ``LastChangeTime``.""" return wintimestamp(self.attr.LastChangeTime) @property def last_change_time_ns(self) -> int: - """Return the $FILE_NAME file LastChangeTime in nanoseconds.""" + """Return the ``$FILE_NAME`` file ``LastChangeTime`` in nanoseconds.""" return ts_to_ns(self.attr.LastChangeTime) @property def last_access_time(self) -> datetime: - """Return the $FILE_NAME file LastAccessTime.""" + """Return the ``$FILE_NAME`` file ``LastAccessTime``.""" return wintimestamp(self.attr.LastAccessTime) @property def last_access_time_ns(self) -> int: - """Return the $FILE_NAME file LastAccessTime in nanoseconds.""" + """Return the ``$FILE_NAME`` file ``LastAccessTime`` in nanoseconds.""" return ts_to_ns(self.attr.LastAccessTime) @property def file_size(self) -> int: - """Return the $FILE_NAME file FileSize.""" + """Return the ``$FILE_NAME`` file ``FileSize``.""" return self.attr.FileSize @property def file_attributes(self) -> int: - """Return the $FILE_NAME file FileAttributes.""" + """Return the ``$FILE_NAME`` file ``FileAttributes``.""" return self.attr.FileAttributes @property def flags(self) -> int: - """Return the $FILE_NAME flags, which can be either FILE_NAME_NTFS or FILE_NAME_DOS.""" + """Return the ``$FILE_NAME`` flags, which can be either ``FILE_NAME_NTFS`` or ``FILE_NAME_DOS``.""" return self.attr.Flags @property def file_name(self) -> str: - """Return the file name string stored in this $FILE_NAME attribute.""" + """Return the file name string stored in this ``$FILE_NAME`` attribute.""" return self.attr.FileName def full_path(self) -> str: @@ -487,7 +487,7 @@ def full_path(self) -> str: class ReparsePoint(AttributeRecord): - """Specific AttributeRecord parser for $REPARSE_POINT.""" + """Specific :class:`AttributeRecord` parser for ``$REPARSE_POINT``.""" __slots__ = ("attr", "tag_header", "buffer") diff --git a/dissect/ntfs/index.py b/dissect/ntfs/index.py index 16a0f6e..f06b764 100644 --- a/dissect/ntfs/index.py +++ b/dissect/ntfs/index.py @@ -3,7 +3,7 @@ import io from enum import Enum, auto from functools import cached_property, lru_cache -from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, List, Optional +from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Optional from dissect.ntfs.attr import AttributeRecord from dissect.ntfs.c_ntfs import ( @@ -64,7 +64,7 @@ def __iter__(self) -> Iterator[IndexEntry]: @lru_cache(128) def index_buffer(self, vcn: int) -> IndexBuffer: - """Return the IndexBuffer at the specified cluster number. + """Return the :class:`IndexBuffer` at the specified cluster number. Args: vcn: The virtual cluster number within the index allocation to read. @@ -122,7 +122,7 @@ def search( return entry def entries(self) -> Iterator[IndexEntry]: - """Yield all IndexEntry's in this Index.""" + """Yield all :class:`IndexEntry`'s in this :class:`Index`.""" for entry in self.root.entries(): if entry.is_end: @@ -148,10 +148,10 @@ def entries(self) -> Iterator[IndexEntry]: class IndexRoot: - """Represents the $INDEX_ROOT. + """Represents the ``$INDEX_ROOT``. Args: - index: The Index class instance this IndexRoot belongs to. + index: The :class:`Index`` class instance this :class:`IndexRoot` belongs to. fh: The file-like object to parse an index root on. """ @@ -182,7 +182,7 @@ def clusters_per_index_buffer(self) -> int: return self.header.ClustersPerIndexBuffer def entries(self) -> Iterator[IndexEntry]: - """Yield all IndexEntry's in this IndexRoot.""" + """Yield all :class:`IndexEntry`'s in this :class:`IndexRoot`.""" yield from _iter_entries( self.index, self.fh, @@ -193,11 +193,11 @@ def entries(self) -> Iterator[IndexEntry]: class IndexBuffer: - """Represent an index buffer in $INDEX_ALLOCATION. + """Represent an index buffer in ``$INDEX_ALLOCATION``. Args: - index: The Index class instance this IndexRoot belongs to. - fh: The file-like object of $INDEX_ALLOCATION. + index: The :class:`Index` class instance this :class:`IndexRoot` belongs to. + fh: The file-like object of ``$INDEX_ALLOCATION``. offset: The offset in bytes to the index buffer on the file-like object we want to read. size: The size of the index buffer in bytes. @@ -224,7 +224,7 @@ def __init__(self, index: Index, fh: BinaryIO, offset: int, size: int): self.header = c_ntfs._INDEX_ALLOCATION_BUFFER(self.data) def entries(self) -> Iterator[IndexEntry]: - """Yield all IndexEntry's in this IndexBuffer.""" + """Yield all :class:`IndexEntry`'s in this :class:`IndexBuffer`.""" yield from _iter_entries( self.index, io.BytesIO(self.data), @@ -238,7 +238,7 @@ class IndexEntry: """Parse and interact with index entries. Args: - index: The Index class instance this IndexEntry belongs to. + index: The :class:`Index` class instance this :class:`IndexEntry` belongs to. fh: The file-like object to parse an index entry on. offset: The offset in the file-like object to parse an index entry at. """ @@ -253,7 +253,7 @@ def __init__(self, index: Index, fh: BinaryIO, offset: int): self.buf = fh.read(self.header.Length - len(c_ntfs._INDEX_ENTRY)) def dereference(self) -> MftRecord: - """Dereference this IndexEntry to the MFT record it points to. + """Dereference this :class:`IndexEntry` to the MFT record it points to. Note that the file reference is a union with the data part so only access this if you know the entry has a file reference and not a data part. @@ -284,7 +284,7 @@ def data(self) -> bytes: @cached_property def attribute(self) -> Optional[AttributeRecord]: - """Return the AttributeRecord of the attribute contained in this entry.""" + """Return the :class:`dissect.ntfs.attr.AttributeRecord` of the attribute contained in this entry.""" if self.key_length and self.index.root.attribute_type: return AttributeRecord.from_fh( io.BytesIO(self.buf), @@ -337,7 +337,7 @@ def _iter_entries(index: Index, fh: BinaryIO, offset: int, size: int) -> Iterato offset += entry.length -def _bsearch(entries: List[IndexEntry], value: Any, cmp: Callable[[IndexEntry, Any], Match]) -> IndexEntry: +def _bsearch(entries: list[IndexEntry], value: Any, cmp: Callable[[IndexEntry, Any], Match]) -> IndexEntry: min_idx = 0 max_idx = len(entries) - 1 diff --git a/dissect/ntfs/mft.py b/dissect/ntfs/mft.py index 5bfe9ee..95e2c7d 100644 --- a/dissect/ntfs/mft.py +++ b/dissect/ntfs/mft.py @@ -4,17 +4,7 @@ from functools import cached_property, lru_cache from io import BytesIO from operator import itemgetter -from typing import ( - TYPE_CHECKING, - Any, - BinaryIO, - Dict, - Iterator, - List, - Optional, - Tuple, - Union, -) +from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional, Union from dissect.cstruct import Instance @@ -46,7 +36,7 @@ class Mft: - """Interact with the $MFT (Master File Table). + """Interact with the ``$MFT`` (Master File Table). Args: fh: A file-like object of the $MFT file. @@ -104,9 +94,9 @@ def get(self, ref: Union[int, str, Instance], root: Optional[MftRecord] = None) """Retrieve an MFT record using a variety of methods. Supported references are: - - _MFT_SEGMENT_REFERENCE cstruct instance - - integer segment number - - string file path + - ``_MFT_SEGMENT_REFERENCE`` cstruct instance + - integer segment number + - string file path Args: ref: Reference to retrieve the record by. @@ -146,7 +136,7 @@ def segments(self) -> Iterator[MftRecord]: class MftRecord: """MFT record parsing and interaction. - Use the from_fh or from_bytes class methods to instantiate. + Use the :func:`~MftRecord.from_fh` or :func:`~MftRecord.from_bytes` class methods to instantiate. """ def __init__(self): @@ -206,7 +196,7 @@ def from_bytes(cls, data: bytes, ntfs: Optional[NTFS] = None) -> MftRecord: return obj def get(self, path: str) -> MftRecord: - """Retrieve a MftRecord relative to this one. + """Retrieve a :class:`MftRecord` relative to this one. Args: path: The path to lookup. @@ -222,7 +212,7 @@ def get(self, path: str) -> MftRecord: def attributes(self) -> AttributeMap: """Parse and return the attributes in this MFT record. - $ATTRIBUTE_LIST's are only parsed if there's an MFT available on the NTFS object. + ``$ATTRIBUTE_LIST``'s are only parsed if there's an MFT available on the NTFS object. Raises: BrokenMftError: If an error occurred parsing the attributes. @@ -263,16 +253,16 @@ def attributes(self) -> AttributeMap: @cached_property def resident(self) -> bool: - """Return whether this record's default $DATA attribute is resident.""" + """Return whether this record's default ``$DATA`` attribute is resident.""" return any(attr.header.resident for attr in self.attributes[ATTRIBUTE_TYPE_CODE.DATA]) @cached_property def filename(self) -> Optional[str]: - """Return the first file name, or None if this record has no file names.""" + """Return the first file name, or ``None`` if this record has no file names.""" filenames = self.filenames() return filenames[0] if filenames else None - def filenames(self, ignore_dos: bool = False) -> List[str]: + def filenames(self, ignore_dos: bool = False) -> list[str]: """Return all file names of this record. Args: @@ -285,8 +275,8 @@ def filenames(self, ignore_dos: bool = False) -> List[str]: result.append((attr.flags, attr.file_name)) return [item[1] for item in sorted(result, key=itemgetter(0))] - def full_path(self, ignore_dos: bool = False): - """Return the first full path, or None if this record has no file names. + def full_path(self, ignore_dos: bool = False) -> Optional[str]: + """Return the first full path, or ``None`` if this record has no file names. Args: ignore_dos: Ignore DOS file name entries. @@ -294,7 +284,7 @@ def full_path(self, ignore_dos: bool = False): paths = self.full_paths(ignore_dos) return paths[0] if paths else None - def full_paths(self, ignore_dos: bool = False): + def full_paths(self, ignore_dos: bool = False) -> list[str]: """Return all full paths of this record. Args: @@ -349,7 +339,7 @@ def reparse_point_substitute_name(self) -> str: def reparse_point_record(self) -> MftRecord: """Resolve a reparse point and return the target record. - Note: absolute links (such as directory junctions) will _always_ fail in the context of a single filesystem. + Note: absolute links (such as directory junctions) will *always* fail in the context of a single filesystem. Absolute links include the drive letter, of which we have no knowledge here. """ if not self.is_reparse_point(): @@ -369,7 +359,7 @@ def reparse_point_record(self) -> MftRecord: def _get_stream_attributes( self, name: str, attr_type: ATTRIBUTE_TYPE_CODE = ATTRIBUTE_TYPE_CODE.DATA ) -> AttributeCollection[Attribute]: - """Return the AttributeCollection of all attributes with the given name and attribute type. + """Return the :class:`~dissect.ntfs.util.AttributeCollection` of all attributes with the given name and attribute type. Args: name: The attribute name, often an empty string. @@ -377,7 +367,7 @@ def _get_stream_attributes( Raises: FileNotFoundError: If there are no attributes with the given name and type. - """ + """ # noqa: E501 attrs = self.attributes.find(name, attr_type) if not attrs: raise FileNotFoundError(f"No such stream on record {self}: ({name!r}, {attr_type})") @@ -421,7 +411,7 @@ def size( def dataruns( self, name: str = "", attr_type: ATTRIBUTE_TYPE_CODE = ATTRIBUTE_TYPE_CODE.DATA - ) -> List[Tuple[int, int]]: + ) -> list[tuple[int, int]]: """Return the dataruns of the given stream name and type. Args: @@ -441,7 +431,7 @@ def index(self, name: str) -> Index: """Open an index on this record. Args: - name: The index name to open. For example, "$I30". + name: The index name to open. For example, ``"$I30"``. """ return Index(self, name) @@ -449,12 +439,12 @@ def iterdir(self, dereference: bool = False, ignore_dos: bool = False) -> Iterat """Yield directory entries of this record. Args: - dereference: Determines whether to resolve the IndexEntry's to MftRecord's. This impacts performance. + dereference: Determines whether to resolve the :class:`~dissect.ntfs.index.IndexEntry`'s to :class:`MftRecord`'s. This impacts performance. ignore_dos: Ignore DOS file name entries. Raises: NotADirectoryError: If this record is not a directory. - """ + """ # noqa: E501 if not self.is_dir(): raise NotADirectoryError(f"{self!r} is not a directory") @@ -463,16 +453,16 @@ def iterdir(self, dereference: bool = False, ignore_dos: bool = False) -> Iterat continue yield entry.dereference() if dereference else entry - def listdir(self, dereference: bool = False, ignore_dos: bool = False) -> Dict[str, Union[IndexEntry, MftRecord]]: + def listdir(self, dereference: bool = False, ignore_dos: bool = False) -> dict[str, Union[IndexEntry, MftRecord]]: """Return a dictionary of the directory entries of this record. Args: - dereference: Determines whether to resolve the IndexEntry's to MftRecord's. This impacts performance. + dereference: Determines whether to resolve the :class:`~dissect.ntfs.index.IndexEntry`'s to :class:`MftRecord`'s. This impacts performance. ignore_dos: Ignore DOS file name entries. Raises: NotADirectoryError: If this record is not a directory. - """ + """ # noqa: E501 result = {} for entry in self.iterdir(dereference, ignore_dos): filenames = entry.filenames(ignore_dos) if dereference else [entry.attribute.file_name] diff --git a/dissect/ntfs/ntfs.py b/dissect/ntfs/ntfs.py index 42116a1..a74c9fd 100644 --- a/dissect/ntfs/ntfs.py +++ b/dissect/ntfs/ntfs.py @@ -1,5 +1,5 @@ from functools import cached_property -from typing import BinaryIO, Iterator, List, Optional, Tuple +from typing import BinaryIO, Iterator, Optional from dissect.ntfs.c_ntfs import ( ATTRIBUTE_TYPE_CODE, @@ -23,15 +23,15 @@ class NTFS: This implementation supports parsing NTFS from either a full NTFS volume or from separate files. If you have a file-like object of an NTFS volume, simply pass it as the fh argument. If you have separate - file-like objects for things like $BOOT or $MFT, pass those as the boot and mft arguments. + file-like objects for things like ``$BOOT`` or ``$MFT``, pass those as the boot and mft arguments. The separate arguments take precedence over parsing from the volume file-like object. Args: fh: A file-like object for the volume to use for parsing NTFS. This is where "data on disk" is read from. - boot: A file-like object for the $BOOT file. - mft: A file-like object for the $MFT file. - usnjrnl: A file-like object for the $Extend/$Usnjrnl:$J file. - sds: A file-like object for the $Secure:$SDS file. + boot: A file-like object for the ``$BOOT`` file. + mft: A file-like object for the ``$MFT`` file. + usnjrnl: A file-like object for the ``$Extend/$Usnjrnl:$J`` file. + sds: A file-like object for the ``$Secure:$SDS`` file. """ def __init__( @@ -146,7 +146,7 @@ def volume_name(self) -> Optional[str]: return None -def _get_dataruns_from_attribute_list(record: MftRecord) -> Iterator[List[Tuple[int, int]]]: +def _get_dataruns_from_attribute_list(record: MftRecord) -> Iterator[list[tuple[int, int]]]: for attr in record.attributes[ATTRIBUTE_TYPE_CODE.ATTRIBUTE_LIST].attributes(): if attr.type == ATTRIBUTE_TYPE_CODE.DATA: yield attr.dataruns() diff --git a/dissect/ntfs/secure.py b/dissect/ntfs/secure.py index bd472d9..eb0e7f7 100644 --- a/dissect/ntfs/secure.py +++ b/dissect/ntfs/secure.py @@ -12,13 +12,13 @@ class Secure: - """Lookup security descriptors from the $Secure file, or optionally just a file-like object of the $SDS. + """Lookup security descriptors from the ``$Secure`` file, or optionally just a file-like object of the ``$SDS``. - Only one the record or sds arguments needs to be provided. + Only one the ``record`` or ``sds`` arguments needs to be provided. Args: - record: The MFT record of the $Secure file, used when opening from a full NTFS volume. - sds: A file-like object of the $SDS stream, used when opening from separate system files. + record: The MFT record of the ``$Secure`` file, used when opening from a full NTFS volume. + sds: A file-like object of the ``$SDS`` stream, used when opening from separate system files. """ def __init__(self, record: MftRecord = None, sds: BinaryIO = None): @@ -68,7 +68,7 @@ def _iter_entries(self, offset: int = 0) -> Iterator[Instance]: def lookup(self, security_id: int) -> SecurityDescriptor: """Lookup a security descriptor by the security ID. - An index is used if available ($SII), otherwise we iterate all entries until we find the correct one. + An index is used if available (``$SII``), otherwise we iterate all entries until we find the correct one. Args: security_id: The security ID to lookup. diff --git a/dissect/ntfs/stream.py b/dissect/ntfs/stream.py index 21e1e86..acff539 100644 --- a/dissect/ntfs/stream.py +++ b/dissect/ntfs/stream.py @@ -1,12 +1,12 @@ import io -from typing import BinaryIO, List, Tuple +from typing import BinaryIO from dissect.util import lznt1 from dissect.util.stream import RunlistStream class CompressedRunlistStream(RunlistStream): - """Specialized RunlistStream for reading NTFS compressed streams. + """Specialized :class:`~dissect.util.stream.RunlistStream` for reading NTFS compressed streams. Args: fh: The source file-like object. @@ -16,7 +16,7 @@ class CompressedRunlistStream(RunlistStream): """ def __init__( - self, fh: BinaryIO, runlist: List[Tuple[int, int]], size: int, cluster_size: int, compression_unit: int + self, fh: BinaryIO, runlist: list[tuple[int, int]], size: int, cluster_size: int, compression_unit: int ): # RunlistStream has block_size but we want to make cluster_size available to be more in line with NTFS naming self.cluster_size = cluster_size @@ -31,11 +31,11 @@ def __init__( self.block_size = cluster_size @property - def runlist(self) -> List[Tuple[int, int]]: + def runlist(self) -> list[tuple[int, int]]: return self._runlist @runlist.setter - def runlist(self, runlist: List[Tuple[int, int]]) -> None: + def runlist(self, runlist: list[tuple[int, int]]) -> None: self._runlist = runlist runs = [] diff --git a/dissect/ntfs/usnjrnl.py b/dissect/ntfs/usnjrnl.py index 6329632..3553caa 100644 --- a/dissect/ntfs/usnjrnl.py +++ b/dissect/ntfs/usnjrnl.py @@ -17,11 +17,11 @@ class UsnJrnl: - """Parse the USN journal from a file-like object of the $UsnJrnl:$J stream. + """Parse the USN journal from a file-like object of the ``$UsnJrnl:$J`` stream. Args: fh: A file-like object of the $UsnJrnl:$J stream. - ntfs: An optional NTFS class instance, used for resolving file paths. + ntfs: An optional :class:`~dissect.ntfs.ntfs.NTFS` class instance, used for resolving file paths. """ def __init__(self, fh: BinaryIO, ntfs: Optional[NTFS] = None): @@ -67,7 +67,7 @@ class UsnRecord: """Parse a USN record from a file-like object and offset. Args: - usnjrnl: The ``UsnJrnl`` class this record is parsed from. + usnjrnl: The :class:`UsnJrnl` class this record is parsed from. fh: The file-like object to parse a USN record from. offset: The offset to parse a USN record at. """ diff --git a/dissect/ntfs/util.py b/dissect/ntfs/util.py index 6c5434f..f587f03 100644 --- a/dissect/ntfs/util.py +++ b/dissect/ntfs/util.py @@ -2,7 +2,7 @@ import struct from collections import UserDict -from typing import TYPE_CHECKING, Any, BinaryIO, List, Optional, Set, Tuple, Union +from typing import TYPE_CHECKING, Any, BinaryIO, Optional, Union from dissect.cstruct import EnumInstance, Instance from dissect.util.stream import RunlistStream @@ -28,14 +28,14 @@ class AttributeMap(UserDict): """Utility dictionary-like object for interacting with a collection of attributes. Allows convenient accessing of attributes added to this collection. For example: - - Get attributes by name, e.g. attributes.DATA to get all $DATA attributes. - - Get attributes by type code enum or integer, e.g. attributes[0x80] or attributes[ATTRIBUTE_TYPE_CODE.DATA]. - - Check attribute membership by enum or integer, e.g. 0x80 in attributes or ATTRIBUTE_TYPE_CODE.DATA in attributes. - - Find all attributes with a given name and type, e.g. attributes.find("$I30", ATTRIBUTE_TYPE_CODE.INDEX_ROOT). + - Get attributes by name, e.g. ``attributes.DATA`` to get all ``$DATA`` attributes. + - Get attributes by type code enum or integer, e.g. ``attributes[0x80]`` or ``attributes[ATTRIBUTE_TYPE_CODE.DATA]``. + - Check attribute membership by enum or integer, e.g. ``0x80 in attributes`` or ``ATTRIBUTE_TYPE_CODE.DATA in attributes``. + - Find all attributes with a given name and type, e.g. ``attributes.find("$I30", ATTRIBUTE_TYPE_CODE.INDEX_ROOT)``. Note that any data retrieval from an ``AttributeMap`` will always succeed and return an :class:`~dissect.ntfs.util.AttributeCollection`, either empty or containing one or more attributes. - """ + """ # noqa: E501 def __getattr__(self, attr: str) -> AttributeCollection: if attr in ATTRIBUTE_TYPE_CODE: @@ -56,7 +56,7 @@ def __contains__(self, key: Union[ATTRIBUTE_TYPE_CODE, int]) -> bool: def add(self, attr: Attribute) -> None: """Add an attribute to the collection. - Note that this is the only intended way to modify the ``AttributeMap``! + Note that this is the only intended way to modify the :class:`AttributeMap`! Args: attr: The attribute to add. @@ -85,11 +85,11 @@ class AttributeCollection(list): Allows convenient access to attribute properties for a list of one or more attributes. - For example, if we have only one attribute we want to access the "size", we want to be able - to do attribute_list.size instead of attribute_list[0].size. + For example, if we have only one attribute we want to access the ``size``, we want to be able + to do ``attribute_list.size`` instead of ``attribute_list[0].size``. Additionally, we can also provide functionality here that we want to perform on a group of - attributes, like open() and size(). + attributes, like ``open()`` and ``size()``. """ def __getattr__(self, attr: str) -> Any: @@ -154,7 +154,7 @@ def size(self, allocated: bool = False) -> int: attrs = self._get_stream_attrs() return attrs[0].header.allocated_size if allocated else attrs[0].header.size - def dataruns(self) -> List[Tuple[int, int]]: + def dataruns(self) -> list[tuple[int, int]]: """Get the dataruns for this list of attributes. Raises: @@ -165,10 +165,10 @@ def dataruns(self) -> List[Tuple[int, int]]: return self._get_dataruns() - def _get_stream_attrs(self) -> List[Attribute]: + def _get_stream_attrs(self) -> list[Attribute]: return sorted((attr for attr in self if not attr.header.resident), key=lambda attr: attr.header.lowest_vcn) - def _get_dataruns(self, attrs: Optional[List[Attribute]] = None) -> List[Tuple[int, int]]: + def _get_dataruns(self, attrs: Optional[list[Attribute]] = None) -> list[tuple[int, int]]: attrs = attrs or self._get_stream_attrs() runs = [] @@ -179,7 +179,7 @@ def _get_dataruns(self, attrs: Optional[List[Attribute]] = None) -> List[Tuple[i def apply_fixup(data: bytes) -> bytes: - """Parse and apply fixup data from MULTI_SECTOR_HEADER to the given bytes. + """Parse and apply fixup data from ``MULTI_SECTOR_HEADER`` to the given bytes. Args: data: The bytes to fixup @@ -225,7 +225,7 @@ def ensure_volume(ntfs: NTFS) -> None: """Check if a volume is available for reading. A volume in this context refers to a disk or other file that contains the raw NTFS data, not contained - in system files like the $MFT. + in system files like the ``$MFT``. Raises: VolumeNotAvailableError: If a volume is not available. @@ -234,7 +234,7 @@ def ensure_volume(ntfs: NTFS) -> None: raise VolumeNotAvailableError() -def get_full_path(mft: Mft, name: str, parent: Instance, seen: Set[str] = None) -> str: +def get_full_path(mft: Mft, name: str, parent: Instance, seen: set[str] = None) -> str: """Walk up parent file references to construct a full path. Args: