Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix UTMP misinterpretation of IPv6 addresses #292

Merged
merged 22 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 0 additions & 63 deletions dissect/target/plugins/os/unix/log/btmp.py

This file was deleted.

180 changes: 172 additions & 8 deletions dissect/target/plugins/os/unix/log/utmp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
import gzip
import ipaddress
import struct
from collections import namedtuple
from typing import Iterator

from dissect import cstruct
from dissect.cstruct import cstruct
from dissect.util.stream import BufferedStream
from dissect.util.ts import from_unix

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.fsutil import TargetPath
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import OperatingSystem, Plugin, export
from dissect.target.target import Target

UTMP_FIELDS = [
("datetime", "ts"),
("string", "ut_type"),
("string", "ut_user"),
("varint", "ut_pid"),
("string", "ut_line"),
("string", "ut_id"),
("string", "ut_host"),
("net.ipaddress", "ut_addr"),
]

BtmpRecord = TargetRecordDescriptor(
"linux/log/btmp",
[
*UTMP_FIELDS,
],
)

WtmpRecord = TargetRecordDescriptor(
"linux/log/wtmp",
[
*UTMP_FIELDS,
],
)

c_utmp = """
#define UT_LINESIZE 32
Expand Down Expand Up @@ -43,21 +79,39 @@
struct exit_status ut_exit;
long ut_session;
struct timeval ut_tv;
int32_t ut_addr_v6[4];
int32_t ut_addr_v6[4]; // Internet address of remote host; IPv4 address uses just ut_addr_v6[0]
char __unused[20];
};
"""
""" # noqa: E501

utmp = cstruct.cstruct()
utmp = cstruct()
utmp.load(c_utmp)

UTMP_ENTRY = namedtuple(
"UTMPRecord",
[
"ts",
"ut_type",
"ut_user",
"ut_pid",
"ut_line",
"ut_id",
"ut_host",
"ut_addr",
],
)


class UtmpFile:
"""utmp maintains a full accounting of the current status of the system"""

def __init__(self, fh, compressed=False):
self.fh = fh
self.compressed = compressed
def __init__(self, target: Target, path: TargetPath):
self.fh = target.fs.path(path).open()

if "gz" in path:
self.compressed = True

Check warning on line 112 in dissect/target/plugins/os/unix/log/utmp.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/utmp.py#L112

Added line #L112 was not covered by tests
else:
self.compressed = False

def __iter__(self):
if self.compressed:
Expand All @@ -68,6 +122,116 @@

while True:
try:
yield utmp.entry(byte_stream)
entry = utmp.entry(byte_stream)

r_type = ""
if entry.ut_type in utmp.Type.reverse:
r_type = utmp.Type.reverse[entry.ut_type]

ut_host = entry.ut_host.decode(errors="surrogateescape").strip("\x00")
ut_addr = None

# UTMP misuses the field ut_addr_v6 for IPv4 and IPv6 addresses, because of this the ut_host field
# is used to determine if the ut_addr_v6 is an IPv6 address where the last 12 bytes of trailing zeroes.
if entry.ut_addr_v6:
if not entry.ut_addr_v6[1:] == [0, 0, 0]:
# IPv6 address that uses > 4 bytes
ut_addr = ipaddress.ip_address(struct.pack("<4i", *entry.ut_addr_v6))
else:
try:
if isinstance(ipaddress.ip_address(ut_host), ipaddress.IPv6Address):
# IPv6 address that uses 4 bytes with 12 bytes of trailing zeroes.
ut_addr = ipaddress.ip_address(struct.pack("<4i", *entry.ut_addr_v6))
elif isinstance(ipaddress.ip_address(ut_host), ipaddress.IPv4Address):
# IPv4 address (ut_host, ut_addr_v6)
ut_addr = ipaddress.ip_address(struct.pack("<i", entry.ut_addr_v6[0]))
else:
pass
except ValueError:
# NOTE: in case the ut_host does not contain a valid IPv6 address,
# ut_addr_v6 is parsed as IPv4 address. This could not lead to incorrect results.
ut_addr = ipaddress.ip_address(struct.pack("<i", entry.ut_addr_v6[0]))

utmp_entry = UTMP_ENTRY(
ts=from_unix(entry.ut_tv.tv_sec),
ut_type=r_type,
ut_pid=entry.ut_pid,
ut_user=entry.ut_user.decode(errors="surrogateescape").strip("\x00"),
ut_line=entry.ut_line.decode(errors="surrogateescape").strip("\x00"),
ut_id=entry.ut_id.decode(errors="surrogateescape").strip("\x00"),
ut_host=ut_host,
ut_addr=ut_addr,
)

yield utmp_entry
except EOFError:
break


class UtmpPlugin(Plugin):
WTMP_GLOB = "/var/log/wtmp*"
BTMP_GLOB = "/var/log/btmp*"

def check_compatible(self) -> None:
if not self.target.os == OperatingSystem.LINUX and not any(
[
list(self.target.fs.glob(self.BTMP_GLOB)),
list(self.target.fs.glob(self.WTMP_GLOB)),
]
):
raise UnsupportedPluginError("No WTMP or BTMP log files found")

Check warning on line 182 in dissect/target/plugins/os/unix/log/utmp.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/utmp.py#L182

Added line #L182 was not covered by tests

@export(record=[BtmpRecord])
def btmp(self) -> Iterator[BtmpRecord]:
"""Return failed login attempts stored in the btmp file.

On a Linux system, failed login attempts are stored in the btmp file located in the var/log/ folder.

References:
- https://en.wikipedia.org/wiki/Utmp
- https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/
"""
btmp_paths = self.target.fs.glob(self.BTMP_GLOB)
for btmp_path in btmp_paths:
btmp = UtmpFile(self.target, btmp_path)

for entry in btmp:
yield BtmpRecord(
ts=entry.ts,
ut_type=entry.ut_type,
ut_pid=entry.ut_pid,
ut_user=entry.ut_user,
ut_line=entry.ut_line,
ut_id=entry.ut_id,
ut_host=entry.ut_host,
ut_addr=entry.ut_addr,
_target=self.target,
)

@export(record=[WtmpRecord])
def wtmp(self) -> Iterator[WtmpRecord]:
"""Return the content of the wtmp log files.

The wtmp file contains the historical data of the utmp file. The utmp file contains information about users
logins at which terminals, logouts, system events and current status of the system, system boot time
(used by uptime) etc.

References:
- https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/
"""
wtmp_paths = self.target.fs.glob(self.WTMP_GLOB)
for wtmp_path in wtmp_paths:
wtmp = UtmpFile(self.target, wtmp_path)

for entry in wtmp:
yield WtmpRecord(
ts=entry.ts,
ut_type=entry.ut_type,
ut_pid=entry.ut_pid,
ut_user=entry.ut_user,
ut_line=entry.ut_line,
ut_id=entry.ut_id,
ut_host=entry.ut_host,
ut_addr=entry.ut_addr,
_target=self.target,
)
62 changes: 0 additions & 62 deletions dissect/target/plugins/os/unix/log/wtmp.py

This file was deleted.

19 changes: 19 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ def fs_unix():
yield fs


@pytest.fixture
def fs_linux():
fs = VirtualFilesystem()
fs.makedirs("var")
fs.makedirs("etc")
fs.makedirs("opt")
yield fs


@pytest.fixture
def fs_osx():
fs = VirtualFilesystem()
Expand Down Expand Up @@ -131,6 +140,16 @@ def target_unix(fs_unix):
yield mock_target


@pytest.fixture
def target_linux(fs_linux):
mock_target = next(make_mock_target())

mock_target.filesystems.add(fs_linux)
mock_target.fs.mount("/", fs_linux)
mock_target.apply()
yield mock_target


@pytest.fixture
def target_osx(fs_osx):
mock_target = next(make_mock_target())
Expand Down
File renamed without changes.
File renamed without changes.
Binary file added tests/data/plugins/os/unix/log/btmp/btmp-ipv6
Binary file not shown.
File renamed without changes.
Binary file added tests/data/plugins/os/unix/log/wtmp/wtmp-ipv6
Binary file not shown.
Loading