/
stat.py
148 lines (131 loc) · 3.96 KB
/
stat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"`#include <sys/socket.h>`"
from __future__ import annotations
from dataclasses import dataclass
from rsyscall._raw import lib, ffi # type: ignore
from rsyscall.handle.fd import BaseFileDescriptor
from rsyscall.handle.pointer import Pointer
from rsyscall.near.sysif import SyscallInterface
from rsyscall.struct import Struct, Serializable
from rsyscall.sys.syscall import SYS
from rsyscall.time import Timespec
import enum
import rsyscall.near.types as near
import typing as t
class S_IF(enum.IntEnum):
SOCK = lib.S_IFSOCK
LNK = lib.S_IFLNK
REG = lib.S_IFREG
BLK = lib.S_IFBLK
DIR = lib.S_IFDIR
CHR = lib.S_IFCHR
IFO = lib.S_IFIFO
# mask with all set
MT = lib.S_IFMT
class Mode(int):
def __repr__(self) -> str:
return oct(self)
def __str__(self) -> str:
return repr(self)
@dataclass
class TypeMode:
# should change this to DT I guess?
# maybe have an as_dt?
type: S_IF
mode: Mode
@staticmethod
def from_int(val: int) -> TypeMode:
type_bits = val & S_IF.MT
return TypeMode(S_IF(type_bits), Mode(val ^ type_bits))
def __int__(self) -> int:
return self.type | self.mode
@dataclass
class Stat(Struct):
dev: int
ino: int
mode: TypeMode
nlink: int
uid: int
gid: int
rdev: int
size: int
blksize: int
blocks: int
atime: Timespec
mtime: Timespec
ctime: Timespec
def to_bytes(self) -> bytes:
return bytes(ffi.buffer(ffi.new('struct stat const*', {
"st_dev": self.dev,
"st_ino": self.ino,
"st_nlink": self.nlink,
"st_mode": self.mode,
"st_uid": self.uid,
"st_gid": self.gid,
"st_rdev": self.rdev,
"st_size": self.size,
"st_blksize": self.blksize,
"st_blocks": self.blocks,
"st_atim": self.atime._to_cffi_dict(),
"st_mtim": self.mtime._to_cffi_dict(),
"st_ctim": self.ctime._to_cffi_dict(),
})))
@property
def atim(self) -> Timespec:
return self.atime
@property
def mtim(self) -> Timespec:
return self.mtime
@property
def ctim(self) -> Timespec:
return self.ctime
T = t.TypeVar('T', bound='Stat')
@classmethod
def from_bytes(cls: t.Type[T], data: bytes) -> T:
struct = ffi.cast('struct stat*', ffi.from_buffer(data))
return cls(
dev=struct.st_dev,
ino=struct.st_ino,
mode=struct.st_mode,
nlink=struct.st_nlink,
uid=struct.st_uid,
gid=struct.st_gid,
rdev=struct.st_rdev,
size=struct.st_size,
blksize=struct.st_blksize,
blocks=struct.st_blocks,
atime=Timespec.from_cffi(struct.st_atim),
mtime=Timespec.from_cffi(struct.st_mtim),
ctime=Timespec.from_cffi(struct.st_ctim),
)
@classmethod
def sizeof(cls) -> int:
return ffi.sizeof('struct stat')
async def _fstat(sysif: SyscallInterface, fd: near.FileDescriptor, statbuf: near.Address) -> None:
await sysif.syscall(SYS.fstat, fd, statbuf)
class StatFileDescriptor(BaseFileDescriptor):
async def fstat(self, statbuf: Pointer[Stat]) -> Pointer[Stat]:
self._validate()
with statbuf.borrow(self.task):
await _fstat(self.task.sysif, self.near, statbuf.near)
return statbuf
#### Tests ####
from unittest import TestCase
class TestStat(TestCase):
def test_stat(self) -> None:
initial = Stat(
dev=0,
ino=0,
mode=TypeMode(S_IF.REG, Mode(0o777)),
nlink=0,
uid=0,
gid=0,
rdev=0,
size=0,
blksize=0,
blocks=0,
atime=Timespec(sec=0, nsec=0),
mtime=Timespec(sec=0, nsec=0),
ctime=Timespec(sec=0, nsec=0),
)
output = Stat.from_bytes(initial.to_bytes())
self.assertEqual(initial, output)