Skip to content

Commit

Permalink
Playing around with Mys.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed Mar 6, 2022
1 parent ffc6c1f commit ad30e16
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 168 deletions.
203 changes: 35 additions & 168 deletions examples/address_book/mys/src/address_book.mys
Original file line number Diff line number Diff line change
@@ -1,172 +1,9 @@
# This file was generated by pbtools.

class PbtoolsError(Error):
message: string

@enum
class WireType:
Varint = 0
Bits64 = 1
LengthDelimited = 2
Bits32 = 5

class Encoder:
pos: u64
buf: bytes

def __init__(self):
self.pos = 99
self.buf = bytes(100)

def write_enum(self, field_number: i64, value: i64):
self.write(field_number, i32(value))

def write(self, field_number: i64, value: string):
if len(value) == 0:
return

data = value.to_utf8()
self.write(data)
self.write_tagged_varint(field_number, WireType.LengthDelimited, len(data))

def write(self, field_number: i64, value: i32):
self.write_tagged_varint(field_number, WireType.Varint, u64(i64(value)))

def write(self, value: bytes):
self.pos -= len(value)

for i in range(i64(len(value))):
self.buf[i64(self.pos) + 1 + i] = value[i]

def write_length_delimited(self, field_number: i64, value: u64):
self.write_varint(value)
self.write_tag(field_number, WireType.LengthDelimited)

def write_tagged_varint(self, field_number: i64, wire_type: WireType, value: u64):
if value == 0:
return

self.write_varint(value)
self.write_tag(field_number, wire_type)

def write_varint(self, value: u64):
buf = b""

while True:
buf += u8(value | 0x80)
value >>= 7

if value == 0:
break

buf[-1] &= 0x7f
self.write(buf)

def write_tag(self, field_number: i64, wire_type: WireType):
self.write_varint(u64((field_number << 3) | i64(wire_type)))

def data(self) -> bytes:
r = b""

for i in range(i64(self.pos + 1), i64(len(self.buf))):
r += self.buf[i]

return r

class Decoder:
data: bytes
pos: i64

def __init__(self, data: bytes):
self.data = data
self.pos = 0

def available(self) -> bool:
return self.pos < i64(len(self.data))

def get(self) -> u8:
if not self.available():
raise PbtoolsError("Out of data.")

value = self.data[self.pos]
self.pos += 1

return value

def read_varint(self) -> u64:
value: u64 = 0
offset: u64 = 0
byte: u8 = 0x80

while (offset < 64) and ((byte & 0x80) == 0x80):
byte = self.get()
value |= ((u64(byte) & 0x7f) << offset)
offset += 7

if (byte & 0x80) == 0x80:
raise PbtoolsError("Varint overflow.")

return value

def read_varint_check_wire_type(self,
wire_type: WireType,
expected_wire_type: WireType) -> u64:
if wire_type != expected_wire_type:
raise PbtoolsError("Unecpected wire type.")

return self.read_varint()

def read_varint_check_wire_type_varint(self, wire_type: WireType) -> u64:
return self.read_varint_check_wire_type(wire_type, WireType.Varint)

def read_tag(self) -> (i64, WireType):
value = self.read_varint()
field_number = i64(value >> 3)

if field_number == 0:
raise PbtoolsError("Field number is zero.")

return (field_number, WireType(i64(value) & 7))

def read_length_delimited(self, wire_type: WireType) -> u64:
return self.read_varint_check_wire_type(wire_type, WireType.LengthDelimited)

def read_string(self, wire_type: WireType) -> string:
size = self.read_length_delimited(wire_type)

return string(self.read(size))

def read_i32(self, wire_type: WireType) -> i32:
return i32(self.read_i64(wire_type))

def read_i64(self, wire_type: WireType) -> i64:
return i64(self.read_varint_check_wire_type_varint(wire_type))

def read(self, size: u64) -> bytes:
data = b""

while size > 0:
data += self.get()
size -= 1

return data

def seek(self, offset: i64):
self.pos += offset

def skip_field(self, wire_type: WireType):
match wire_type:
case WireType.Varint:
self.read_varint()
case WireType.Bits64:
self.seek(8)
case WireType.LengthDelimited:
value = self.read_length_delimited(wire_type)
self.seek(i64(value))
case WireType.Bits32:
self.seek(4)
case _:
raise PbtoolsError("Bad wire type")
from .pbtools import Decoder
from .pbtools import Encoder
from .pbtools import PbtoolsError
from .pbtools import WireType

@enum
class PersonPhoneType:
Expand All @@ -178,16 +15,38 @@ class PersonPhoneNumber:
number: string
type: PersonPhoneType

def clear(self):
self.number = ""
self.type = PersonPhoneType.Mobile

def to_bytes_inner(self, encoder: Encoder):
encoder.write_enum(2, i64(self.type))
encoder.write(1, self.number)

def from_bytes_inner(self, decoder: Decoder):
while decoder.available():
field_number, wire_type = decoder.read_tag()

match field_number:
case 1:
self.number = decoder.read_string(wire_type)
case 2:
self.type = PersonPhoneType(i64(decoder.read_i32(wire_type)))
case _:
decoder.skip_field(wire_type)

class Person:
name: string
id: i32
email: string
phones: [PersonPhoneNumber]

def clear(self):
self.name = ""
self.id = 0
self.email = ""
self.phones = []

def to_bytes_inner(self, encoder: Encoder):
for phone in reversed(self.phones):
pos = encoder.pos
Expand All @@ -209,20 +68,28 @@ class Person:
self.id = decoder.read_i32(wire_type)
case 3:
self.email = decoder.read_string(wire_type)
case 4:
decoder.read_length_delimited(WireType.LengthDelimited)
phone = PersonPhoneNumber("", PersonPhoneType.Mobile)
phone.from_bytes_inner(decoder)
self.phones.append(phone)
case _:
decoder.skip_field(wire_type)

class AddressBook:
people: [Person]

def clear(self):
self.people.clear()

def to_bytes(self) -> bytes:
encoder = Encoder()
self.to_bytes_inner(encoder)

return encoder.data()

def from_bytes(self, data: bytes):
self.people.clear()
self.clear()
self.from_bytes_inner(Decoder(data))

def to_bytes_inner(self, encoder: Encoder):
Expand Down

0 comments on commit ad30e16

Please sign in to comment.