-
Notifications
You must be signed in to change notification settings - Fork 11
/
binon.py
135 lines (123 loc) · 3.47 KB
/
binon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from rpython.rlib.mutbuffer import MutableStringBuffer
from rpython.rlib.rstruct import ieee
from rpython.rlib import rfile
from bon import open_file
from space import *
import pathobj, os
def write_file(pathname, data):
name = pathobj.os_stringify(pathname).encode('utf-8')
try:
fd = rfile.create_file(name, "wb")
try:
dump(fd, data)
finally:
fd.close()
except IOError as error:
message = os.strerror(error.errno).decode('utf-8')
raise OldError(u"%s: %s" % (pathobj.stringify(pathname), message))
return null
def dump(fd, data):
tp = get_interface(data)
if tp is Integer.interface:
fd.write(chr(0))
wlong(fd, data)
elif tp is Float.interface:
fd.write(chr(1))
wdouble(fd, data)
elif tp is String.interface:
fd.write(chr(2))
wstring(fd, data)
elif tp is List.interface:
fd.write(chr(3))
wlist(fd, data)
elif tp is Dict.interface:
fd.write(chr(4))
wdict(fd, data)
elif tp is Uint8Array.interface or tp is Uint8Slice.interface: # TODO: consider this again.
fd.write(chr(5))
wbytes(fd, data)
elif tp is Boolean.interface:
fd.write(chr(6))
wboolean(fd, data)
elif tp is null:
fd.write(chr(7))
elif tp is FloatRepr.interface:
assert isinstance(data, FloatRepr)
fd.write(chr(8))
wstring(fd, data.string)
else:
raise OldError(u"no binon encoding found for an object: " + data.repr())
def w32(fd, value):
fd.write(''.join([
chr(value >> 24 & 255),
chr(value >> 16 & 255),
chr(value >> 8 & 255),
chr(value >> 0 & 255),
]))
def wlong(fd, num):
"http://en.wikipedia.org/wiki/Variable-length_quantity"
assert isinstance(num, Integer)
value = num.value
output = []
if value < 0:
negative = True
value = -value
else:
negative = False
output.append(value & 0x7F)
while value > 0x7F:
value >>= 7
output.append(0x80 | value & 0x7F)
if output[-1] & 0x40 != 0:
output.append(0x80)
if negative:
output[-1] |= 0x40
result = []
for x in reversed(output):
result.append(chr(x))
fd.write(''.join(result))
def wdouble(fd, obj):
assert isinstance(obj, Float)
result = MutableStringBuffer(8)
ieee.pack_float(result, 0, obj.number, 8, True)
val = result.finish()
assert val is not None
fd.write(val)
def wstring(fd, obj):
assert isinstance(obj, String)
s = obj.string.encode('utf-8')
w32(fd, len(s))
fd.write(s)
def wlist(fd, obj):
assert isinstance(obj, List)
w32(fd, len(obj.contents))
for o in obj.contents:
dump(fd, o)
def wdict(fd, obj):
assert isinstance(obj, Dict)
w32(fd, len(obj.data))
for key, value in obj.data.items():
dump(fd, key)
dump(fd, value)
def wbytes(fd, obj):
assert isinstance(obj, Uint8Data)
w32(fd, obj.length)
result = []
for i in range(obj.length):
result.append(chr(obj.uint8data[i]))
fd.write(''.join(result))
def wboolean(fd, obj):
if is_true(obj):
return fd.write('\x01')
else:
return fd.write('\x00')
def wnull(fd, obj):
pass
module = Module(u'binon', {
u"read_file": Builtin(
signature(Object)(open_file),
u"read_file"),
u"write_file": Builtin(
signature(Object, Object)(write_file),
u"write_file"),
}, frozen=True)