diff --git a/dissect/target/plugins/os/unix/log/btmp.py b/dissect/target/plugins/os/unix/log/btmp.py deleted file mode 100644 index 4624045af..000000000 --- a/dissect/target/plugins/os/unix/log/btmp.py +++ /dev/null @@ -1,63 +0,0 @@ -import socket -import struct - -from dissect.util.ts import from_unix - -from dissect.target.helpers.record import TargetRecordDescriptor -from dissect.target.plugin import Plugin, export -from dissect.target.plugins.os.unix.log.utmp import UtmpFile, utmp - -BtmpRecord = TargetRecordDescriptor( - "linux/log/btmp", - [ - ("datetime", "ts"), - ("string", "ut_type"), - ("string", "ut_user"), - ("varint", "ut_pid"), - ("string", "ut_line"), - ("string", "ut_id"), - ("string", "ut_host"), - ("net.ipaddress", "ut_addr"), - ], -) - - -class BtmpPlugin(Plugin): - """btmp log records failed login attempts""" - - def check_compatible(self): - check = list(self.target.fs.glob("/var/log/btmp*")) - return len(check) > 0 - - @export(record=[BtmpRecord]) - def btmp(self): - """Return failed login attempts stored in the btmp file. - - On a Linux system, failed login attempts are stored in the btmp file located in the var/log/ folder. - - References: - - https://en.wikipedia.org/wiki/Utmp - - https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/ - """ - btmp_paths = self.target.fs.glob("/var/log/btmp*") - for btmp_path in btmp_paths: - if "gz" in btmp_path: - btmp = UtmpFile(self.target.fs.open(btmp_path), compressed=True) - else: - btmp = UtmpFile(self.target.fs.open(btmp_path)) - r_type = "" - for entry in btmp: - if entry.ut_type in utmp.Type.reverse: - r_type = utmp.Type.reverse[entry.ut_type] - - yield BtmpRecord( - ts=from_unix(entry.ut_tv.tv_sec), - ut_type=r_type, - ut_pid=entry.ut_pid, - ut_user=entry.ut_user.decode().strip("\x00"), - ut_line=entry.ut_line.decode().strip("\x00"), - ut_id=entry.ut_id.decode().strip("\x00"), - ut_host=entry.ut_host.decode().strip("\x00"), - ut_addr=socket.inet_ntoa(struct.pack(" 4 bytes + ut_addr = ipaddress.ip_address(struct.pack("<4i", *entry.ut_addr_v6)) + else: + try: + if isinstance(ipaddress.ip_address(ut_host), ipaddress.IPv6Address): + # IPv6 address that uses 4 bytes with 12 bytes of trailing zeroes. + ut_addr = ipaddress.ip_address(struct.pack("<4i", *entry.ut_addr_v6)) + elif isinstance(ipaddress.ip_address(ut_host), ipaddress.IPv4Address): + # IPv4 address (ut_host, ut_addr_v6) + ut_addr = ipaddress.ip_address(struct.pack(" None: + if not self.target.os == OperatingSystem.LINUX and not any( + [ + list(self.target.fs.glob(self.BTMP_GLOB)), + list(self.target.fs.glob(self.WTMP_GLOB)), + ] + ): + raise UnsupportedPluginError("No WTMP or BTMP log files found") + + @export(record=[BtmpRecord]) + def btmp(self) -> Iterator[BtmpRecord]: + """Return failed login attempts stored in the btmp file. + + On a Linux system, failed login attempts are stored in the btmp file located in the var/log/ folder. + + References: + - https://en.wikipedia.org/wiki/Utmp + - https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/ + """ + btmp_paths = self.target.fs.glob(self.BTMP_GLOB) + for btmp_path in btmp_paths: + btmp = UtmpFile(self.target, btmp_path) + + for entry in btmp: + yield BtmpRecord( + ts=entry.ts, + ut_type=entry.ut_type, + ut_pid=entry.ut_pid, + ut_user=entry.ut_user, + ut_line=entry.ut_line, + ut_id=entry.ut_id, + ut_host=entry.ut_host, + ut_addr=entry.ut_addr, + _target=self.target, + ) + + @export(record=[WtmpRecord]) + def wtmp(self) -> Iterator[WtmpRecord]: + """Return the content of the wtmp log files. + + The wtmp file contains the historical data of the utmp file. The utmp file contains information about users + logins at which terminals, logouts, system events and current status of the system, system boot time + (used by uptime) etc. + + References: + - https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/ + """ + wtmp_paths = self.target.fs.glob(self.WTMP_GLOB) + for wtmp_path in wtmp_paths: + wtmp = UtmpFile(self.target, wtmp_path) + + for entry in wtmp: + yield WtmpRecord( + ts=entry.ts, + ut_type=entry.ut_type, + ut_pid=entry.ut_pid, + ut_user=entry.ut_user, + ut_line=entry.ut_line, + ut_id=entry.ut_id, + ut_host=entry.ut_host, + ut_addr=entry.ut_addr, + _target=self.target, + ) diff --git a/dissect/target/plugins/os/unix/log/wtmp.py b/dissect/target/plugins/os/unix/log/wtmp.py deleted file mode 100644 index ea278b4ca..000000000 --- a/dissect/target/plugins/os/unix/log/wtmp.py +++ /dev/null @@ -1,62 +0,0 @@ -import socket -import struct - -from dissect.util.ts import from_unix - -from dissect.target.helpers.record import TargetRecordDescriptor -from dissect.target.plugin import Plugin, export -from dissect.target.plugins.os.unix.log.utmp import UtmpFile, utmp - -WtmpRecord = TargetRecordDescriptor( - "linux/log/wtmp", - [ - ("datetime", "ts"), - ("string", "ut_type"), - ("string", "ut_user"), - ("varint", "ut_pid"), - ("string", "ut_line"), - ("string", "ut_id"), - ("string", "ut_host"), - ("net.ipaddress", "ut_addr"), - ], -) - - -class WtmpPlugin(Plugin): - def check_compatible(self): - check = list(self.target.fs.glob("/var/log/wtmp*")) - return len(check) > 0 - - @export(record=[WtmpRecord]) - def wtmp(self): - """Return the content of the wtmp log files. - - The wtmp file contains the historical data of the utmp file. The utmp file contains information about users - logins at which terminals, logouts, system events and current status of the system, system boot time - (used by uptime) etc. - - References: - - https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/ - """ - wtmp_paths = self.target.fs.glob("/var/log/wtmp*") - for wtmp_path in wtmp_paths: - if "gz" in wtmp_path: - wtmp = UtmpFile(self.target.fs.open(wtmp_path), compressed=True) - else: - wtmp = UtmpFile(self.target.fs.open(wtmp_path)) - r_type = "" - for entry in wtmp: - if entry.ut_type in utmp.Type.reverse: - r_type = utmp.Type.reverse[entry.ut_type] - - yield WtmpRecord( - ts=from_unix(entry.ut_tv.tv_sec), - ut_type=r_type, - ut_pid=entry.ut_pid, - ut_user=entry.ut_user.decode().strip("\x00"), - ut_line=entry.ut_line.decode().strip("\x00"), - ut_id=entry.ut_id.decode().strip("\x00"), - ut_host=entry.ut_host.decode().strip("\x00"), - ut_addr=socket.inet_ntoa(struct.pack(" None: + data_file = absolute_path("data/plugins/os/unix/log/btmp/btmp-ipv6") + fs_linux.map_file("var/log/btmp", data_file) - target_unix.add_plugin(WtmpPlugin) + target_linux.add_plugin(UtmpPlugin) - results = list(target_unix.wtmp()) + results = list(target_linux.btmp()) + + # IPv4 address + results[0].ut_host == "127.0.0.1" + results[0].ut_addr == "127.0.0.1" + + # IPv6 address + results[4].ut_host == "1337::1" + results[4].ut_addr == "1337::1" + + # IPv6 address with 12 bytes of trailing zeroes + results[5].ut_host == "1337:1::" + results[5].ut_addr == "1337:1::" + + +def test_wtmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + data_file = absolute_path("data/plugins/os/unix/log/wtmp/wtmp") + fs_linux.map_file("var/log/wtmp", data_file) + + target_linux.add_plugin(UtmpPlugin) + + results = list(target_linux.wtmp()) assert len(results) == 70 result = results[-1] assert result.ts == datetime(2021, 11, 12, 10, 12, 54, tzinfo=timezone.utc) @@ -27,13 +49,13 @@ def test_wtmp_plugin(target_unix, fs_unix): assert result.ut_addr == "0.0.0.0" -def test_lastlog_plugin(target_unix_users, fs_unix): - data_file = absolute_path("data/unix/logs/lastlog") - fs_unix.map_file("/var/log/lastlog", data_file) +def test_lastlog_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + data_file = absolute_path("data/plugins/os/unix/log/lastlog/lastlog") + fs_linux.map_file("/var/log/lastlog", data_file) - target_unix_users.add_plugin(LastLogPlugin) + target_linux.add_plugin(LastLogPlugin) - results = list(target_unix_users.lastlog()) + results = list(target_linux.lastlog()) assert len(results) == 1 assert results[0].ts == datetime(2021, 12, 8, 16, 14, 6, tzinfo=timezone.utc) @@ -43,13 +65,13 @@ def test_lastlog_plugin(target_unix_users, fs_unix): assert results[0].ut_tty == "pts/0" -def test_btmp_plugin(target_unix, fs_unix): - data_file = absolute_path("data/unix/logs/btmp") - fs_unix.map_file("var/log/btmp", data_file) +def test_btmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + data_file = absolute_path("data/plugins/os/unix/log/btmp/btmp") + fs_linux.map_file("var/log/btmp", data_file) - target_unix.add_plugin(BtmpPlugin) + target_linux.add_plugin(UtmpPlugin) - results = list(target_unix.btmp()) + results = list(target_linux.btmp()) assert len(results) == 10 result = results[-1] assert result.ts == datetime(2021, 11, 30, 23, 2, 9, tzinfo=timezone.utc) @@ -62,8 +84,8 @@ def test_btmp_plugin(target_unix, fs_unix): assert result.ut_addr == "8.210.13.5" -def test_atop_plugin(target_unix, fs_unix): - data_file = absolute_path("data/unix/logs/atop") +def test_atop_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + data_file = absolute_path("data/plugins/os/unix/log/atop/atop") fs_unix.map_file("var/log/atop/atop_20221111", data_file) target_unix.add_plugin(AtopPlugin)