Skip to content

Commit

Permalink
Added support for voidtools everything DB
Browse files Browse the repository at this point in the history
  • Loading branch information
cobyge committed Jan 27, 2024
1 parent b49ee9b commit eaffcfa
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ tests/_data/plugins/os/windows/notifications/wpndatabase.db filter=lfs diff=lfs
tests/_data/volumes/bde/enc-volume.bin filter=lfs diff=lfs merge=lfs -text
tests/_data/volumes/md/md-nested.bin.gz filter=lfs diff=lfs merge=lfs -text
tests/_data/loaders/tar/test-anon-filesystems.tar filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/windows/everything/everything.db filter=lfs diff=lfs merge=lfs -text
298 changes: 298 additions & 0 deletions dissect/target/helpers/locate/everything.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
import bz2
import logging
import dataclasses
from datetime import datetime

from typing import IO, Iterator, Optional, Union
from enum import auto, IntFlag
from dissect.util.ts import wintimestamp

logger = logging.getLogger(__name__)
FILE_MAGIC = b"ESDb"
# EZDB is an old format, used for EverythingDB 1.6.6 and 1.6.7 (Everything 1.2.1, back in 2009)
UNSUPPORTED_FILE_MAGIC = b"EZDB"
BZIP_HEADER = b"BZh9"


class EverythingFlags(IntFlag):
HasFileSize = auto()
HasDateCreated = auto()
HasDateModified = auto()
HasDateAccessed = auto()
HasAttributes = auto()
HasFolderSize = auto()


class EverythingIndexObj:
def __init__(self) -> None:
# This is the index of filesystem_list
self.fs_index = None
# Path to file (Only basename)
self.file_path = None
# Anything not explicitly set with parent index has an fs_index
self.parent_index = None
self.size = None
self.date_created = None
self.date_modified = None
self.date_accessed = None
self.attributes = None

def resolve_path(self, folder_list) -> str:
if self.parent_index is not None:
return (
folder_list[self.parent_index].resolve_path(folder_list)
+ "\\"
+ self.file_path
)
else:
return self.file_path


@dataclasses.dataclass
class EverythingF:
file_path: str
size: Optional[int]
date_created: Optional[datetime]
date_modified: Optional[datetime]
date_accessed: Optional[datetime]
attributes: Optional[int]


class EverythingDirectory(EverythingF):
pass


class EverythingFile(EverythingF):
pass


class EverythingDBParser:
def __init__(self, file_handle: IO[bytes]):
self.fh = file_handle
magic = self.__parse_magic(self.fh)

# Everything supports BZipped databases
if magic == BZIP_HEADER:
self.fh.seek(-4, 1)
self.fh = bz2.open(self.fh)
magic = self.__parse_magic(self.fh)

if magic == UNSUPPORTED_FILE_MAGIC:
raise NotImplementedError(f"{UNSUPPORTED_FILE_MAGIC} files are not yet supported")

if magic != FILE_MAGIC:
raise ValueError(f"is not a known EverythingDB file. Magic: {magic.decode()}")

self.__db_version = self.__parse_db_version()

# This is the latest as of v1.4.1 which was released in 2017 (Maybe also a bit earlier?)
# This is the version of the database format, not of Everything itself
# Has not been tested on earlier versions, might actually work
# I can add support for more versions if needed
assert self.__db_version == "1.7.20"

self.flags = EverythingFlags(self.read_u32())

self.number_of_folders = self.read_u32()
self.number_of_files = self.read_u32()
self.total_filesystem_num = self.read_byte_or_4()

# Most of the filesystem data is going to waste, not sure if we want it or not
for i in range(self.total_filesystem_num):
fs_type = self.read_byte_or_4()
# print(f"Filesystem {i}: type {fs_type}")
fs_out_of_date = self.read_u8()
# print(f"Filesystem {i}: out of date {fs_out_of_date}")
if fs_type != 0:
# I haven't been able to get anything to register as non-NTFS, even FAT drives for some reason
# If someone has a non-0 drive, send a database and I'll add support
raise ValueError(f"Unsupported FS type {fs_type}")
fs_guid = self.read_len_then_data().decode()
# print(f"Filesystem {i} NTFS: guid: {fs_guid}")
fs_path = self.read_len_then_data().decode()
# print(f"Filesystem {i} NTFS: path: {fs_path}")
fs_root = self.read_len_then_data().decode()
# print(f"Filesystem {i} NTFS: root: {fs_root}")
include_only = self.read_len_then_data().decode()
# print(
# f"Filesystem {i} NTFS: include_only: {include_only}"
# )
journal_id = self.read_u64()
# print(f"Filesystem {i} NTFS: USN Journal ID: {hex(journal_id)}")
next_usn = self.read_u64()
# print(f"Filesystem {i} NTFS: Next USN: {hex(next_usn)}")

# Unused in this format
exclude_flags = self.read_u8()
# print(f"Exclude flags: {exclude_flags}")

# There is some logic here, but in my test file it doesn't affect the data
# (Tested by dumping the raw array while debugging in IDA)
# In practice, none of the data in the function is saved, it just advances the offset
# This *MIGHT* be affected by:
# a different fs_type
# an old version of the db (before 1.7.20)
# If any of these fail, then I'd have to reverse this function and add support

# I expect 0 here because the inner implementation of the function is:
# for i in range(self.read_byte_or_4()):
# do something
# and the function is called 3 times one after another. As long as zero is returned each time, we don't need
# to implement the logic
if [self.read_byte_or_4(), self.read_byte_or_4(), self.read_byte_or_4()] != [0, 0, 0]:
raise NotImplementedError("Failed to parse database, need to implement support for weird database")

def __iter__(self) -> Iterator[Union[EverythingFile, EverythingDirectory]]:
folder_list = [EverythingIndexObj() for _ in range(self.number_of_folders)]
for i in range(self.number_of_folders):
parent_index = self.read_u32()
assert parent_index < (
self.total_filesystem_num + self.number_of_folders
), "Invalid folder offset"
if parent_index >= self.number_of_folders:
folder_list[i].fs_index = parent_index - self.number_of_folders
else:
folder_list[i].parent_index = parent_index

# There is a block of code here, that in my tests does absolutely nothing
# (checked by dumping bytes before and after the function)
# It might have to do with fs_type, so leaving a comment here for future reference
# ... snip

temp_buf = b""
for i in range(self.number_of_folders):
# Explanation:
# Everything has an "Optimization", where it saves all the basenames of the folders (and files)
# to the disk alphabetically. This allows them to reuse similar filename buffers.
# For example, if two folders in the filesystem are named "Potato" and "PotatoSalad" respectively,
# (and are alphabetically consecutive)
# then the first file will have data "Potato", with a str_len of 6,
# and the second file will have a str_len of 5 (length of "Salad"), and a "num_from_prev" (see below) of 5,
# thereby telling us to remove the last 5 bytes of the previous buffer, and saving allocations.
# The same thing happens later on when parsing filenames

prev_size = len(temp_buf)

# TODO: Different handling for filesystems where fs_type=1
str_len = self.read_byte_or_4()
if str_len:
read_from_prev = self.read_byte_or_4()
assert read_from_prev <= prev_size, "Invalid folder code offset"
temp_buf = temp_buf[: prev_size - read_from_prev]
temp_buf += self.read(str_len)
folder_list[i].file_path = temp_buf.decode()

folder_list[i].attributes = 16

if EverythingFlags.HasFolderSize in self.flags:
folder_list[i].size = self.read_u64()
if EverythingFlags.HasDateCreated in self.flags:
folder_list[i].date_created = self.read_u64()
if EverythingFlags.HasDateModified in self.flags:
folder_list[i].date_modified = self.read_u64()
if EverythingFlags.HasDateAccessed in self.flags:
folder_list[i].date_accessed = self.read_u64()
if EverythingFlags.HasAttributes in self.flags:
folder_list[i].attributes = self.read_u32()
# This is used where fs type=3, ignoring for now
self.read_u64()

for folder in folder_list:
# The yield happens here, because we can only resolve paths once we get all basenames for all folders
yield EverythingDirectory(
file_path=folder.resolve_path(folder_list),
size=folder.size,
attributes=folder.attributes,
date_created=wintimestamp(folder.date_created) if folder.date_created else None,
date_modified=wintimestamp(folder.date_modified) if folder.date_modified else None,
date_accessed=wintimestamp(folder.date_accessed) if folder.date_accessed else None,
)

temp_buf = b""
for _ in range(self.number_of_files):
# See comment above for explanation of this loop
# TODO: Different handling for filesystems where fs_type=1
prev_size = len(temp_buf)
parent_index = self.read_u32()
assert not (
parent_index > self.total_filesystem_num + self.number_of_folders
), "Invalid parent folder offset"
assert parent_index < self.number_of_folders, "Something weird"

file_name = folder_list[parent_index].resolve_path(folder_list)
str_len = self.read_byte_or_4()
if str_len:
read_from_prev = self.read_byte_or_4()
assert read_from_prev <= prev_size, "Invalid file code offset"
temp_buf = temp_buf[: prev_size - read_from_prev]
temp_buf += self.read(str_len)
file_size = self.read_u64() if EverythingFlags.HasFileSize in self.flags else None
date_created = self.read_u64() if EverythingFlags.HasDateCreated in self.flags else None
date_modified = self.read_u64() if EverythingFlags.HasDateModified in self.flags else None
date_accessed = self.read_u64() if EverythingFlags.HasDateAccessed in self.flags else None
attributes = self.read_u32() if EverythingFlags.HasAttributes in self.flags else None

try:
yield EverythingFile(
file_path=f"{file_name}\\{temp_buf.decode()}",
size=file_size,
attributes=attributes,
date_created=wintimestamp(date_created) if date_created else None,
date_modified=wintimestamp(date_modified) if date_modified else None,
date_accessed=wintimestamp(date_accessed) if date_accessed else None,
)
# This shouldn't be possible, but it happened in my tests to folders in the recycle bin
except UnicodeDecodeError as e:
logger.warning(f"Failed parsing filepath: {file_name}\\{temp_buf}", exc_info=e)

@staticmethod
def __parse_magic(reader: IO[bytes]) -> bytes:
mgk = reader.read(4)
return mgk

def read(self, i: int) -> bytes:
return self.fh.read(i)

def read_u8(self) -> int:
return self.fh.read(1)[0]

def read_u32(self) -> int:
return int.from_bytes(self.fh.read(4), byteorder="little")

def read_u64(self) -> int:
return int.from_bytes(self.fh.read(8), byteorder="little")

@property
def version(self):
return self.__db_version

def __parse_db_version(self):
"""This function must only be called from init when fh is in the right position
This logic is mostly guesswork and just whatever worked, I didn't actually look at the real implementation
For now this isn't critical as it works on the latest version of the DB (2016 as of 2022)
"""
[v1, v2, v3, v4] = self.read(4)
version = f"{v4}.{v3}.{int.from_bytes([v1, v2], byteorder='little')}"
return version

def read_byte_or_4(self) -> int:
"""This functions reads a single byte, and in case the first byte is 0xFF, reads another 4 (Saving space)
In decompiled-ish code:
int v1;
LOBYTE(v1) = read(fd, 1);
if ( (_BYTE)v1 == 0xFF ) v1 = read(fd, 4);
else v1 = (unsigned __int8)v1;
"""
first = self.read(1)[0]
if first == 0xFF:
# This has never actually happened to me, so debating leaving this here for now
raise NotImplementedError("Untested feature, can remove comment and see if this still works :)")
return int.from_bytes(data.read(4), byteorder="little", signed=True)
else:
return first

def read_len_then_data(self):
data_len = self.read_byte_or_4()
return self.read(data_len)

0 comments on commit eaffcfa

Please sign in to comment.