In [1]:
import pdb
import os
import sys
from ctypes import (
    CDLL,
    POINTER,
    Structure,
    byref,
    string_at,
    c_char_p,
    c_int32,
    c_int64,
    c_uint64,
    c_ubyte,
)

from ctypes.util import find_library
from typing import Optional, Union

LIB: CDLL = None

class FfiByteBuffer(Structure):
    """A byte buffer allocated by python."""
    _fields_ = [
        ("length", c_int64),
        ("data", POINTER(c_ubyte)),
    ]


class FfiError(Structure):
    """An error allocated by python."""
    _fields_ = [
        ("code", c_int32),
        ("message", c_char_p),
    ]


def _decode_bytes(arg: Optional[Union[str, bytes, FfiByteBuffer]]) -> bytes:
    if isinstance(arg, FfiByteBuffer):
        return string_at(arg.data, arg.length)
    if isinstance(arg, memoryview):
        return string_at(arg.obj, arg.nbytes)
    if isinstance(arg, bytearray):
        return arg
    if arg is not None:
        if isinstance(arg, str):
            return arg.encode("utf-8")
    return bytearray()


def _encode_bytes(arg: Optional[Union[str, bytes, FfiByteBuffer]]) -> FfiByteBuffer:
    if isinstance(arg, FfiByteBuffer):
        return arg
    buf = FfiByteBuffer()
    if isinstance(arg, memoryview):
        buf.length = arg.nbytes
        if arg.contiguous and not arg.readonly:
            buf.data = (c_ubyte * buf.length).from_buffer(arg.obj)
        else:
            buf.data = (c_ubyte * buf.length).from_buffer_copy(arg.obj)
    elif isinstance(arg, bytearray):
        buf.length = len(arg)
        if buf.length > 0:
            buf.data = (c_ubyte * buf.length).from_buffer(arg)
    elif arg is not None:
        if isinstance(arg, str):
            arg = arg.encode("utf-8")
        buf.length = len(arg)
        if buf.length > 0:
            buf.data = (c_ubyte * buf.length).from_buffer_copy(arg)
    return buf


def _load_library(lib_name: str) -> CDLL:
    lib_prefix_mapping = {"win32": ""}
    lib_suffix_mapping = {"darwin": ".dylib", "win32": ".dll"}
    try:
        os_name = sys.platform
        lib_prefix = lib_prefix_mapping.get(os_name, "lib")
        lib_suffix = lib_suffix_mapping.get(os_name, ".so")
        lib_path = os.path.join(
            os.path.dirname(os.getcwd()), f"agora-allosaurus-rs/target/release/{lib_prefix}{lib_name}{lib_suffix}"
        )
        return CDLL(lib_path)
    except KeyError:
        print ("Unknown platform for shared library")
    except OSError:
        print ("Library not loaded from python package")

    lib_path = find_library(lib_name)
    if not lib_path:
        if sys.platform == "darwin":
            ld = os.getenv("DYLD_LIBRARY_PATH")
            lib_path = os.path.join(ld, "liboberon.dylib")
            if os.path.exists(lib_path):
                return CDLL(lib_path)

            ld = os.getenv("DYLD_FALLBACK_LIBRARY_PATH")
            lib_path = os.path.join(ld, "liboberon.dylib")
            if os.path.exists(lib_path):
                return CDLL(lib_path)
        elif sys.platform != "win32":
            ld = os.getenv("LD_LIBRARY_PATH")
            lib_path = os.path.join(ld, "liboberon.so")
            if os.path.exists(lib_path):
                return CDLL(lib_path)

        raise Exception(f"Error loading library: {lib_name}")
    try:
        return CDLL(lib_path)
    except OSError as e:
        raise Exception(f"Error loading library: {lib_name}")


def _get_library() -> CDLL:
    global LIB
    if LIB is None:
        LIB = _load_library("agora_allosaurus_rs")

    return LIB

def _get_func(fn_name: str):
    return getattr(_get_library(), fn_name)

def _free_buffer(buffer: FfiByteBuffer):
    lib_fn = _get_func("allosaurus_byte_buffer_free")
    lib_fn(byref(buffer))


def _free_string(err: FfiError):
    lib_fn = _get_func("allosaurus_string_free")
    lib_fn(byref(err))


def _free_handle(handle: c_int64, err: FfiError):
    lib_fn = _get_func("allosaurus_create_proof_free")
    lib_fn(handle, byref(err))

In [2]:
def new_server() -> c_int64:
    err = FfiError()
    lib_fn = _get_func("allosaurus_new_server")
    lib_fn.restype = c_uint64

    handle = lib_fn(byref(err))
    if handle == 0:
        message = string_at(err.message)
        raise Exception(message)
    handle = c_uint64(handle)
    return handle
server = new_server()
server

c_ulong(4707266925007208450)

In [3]:
def new_user(server) -> c_int64:
    buffer = FfiByteBuffer()
    err = FfiError()
    lib_fn = _get_func("allosaurus_new_user")
    lib_fn(server, byref(buffer), byref(err))

    if not buffer:
        message = string_at(err.message)
        raise Exception(message)
    buffer = _decode_bytes(buffer)
    return buffer
new_user(server)

b'1\xc1\x05PsC<v\x13\xa2g\\[Z\x8e=;m!\xed\xf4<%\x1c\x98\xbc\xe9\xe7\x15\xb4\t\x00\xb9W\xfdX9\xcdYB\x172\xcc\x12>\x05\xbe\xe7\xd6@LO\xed\xce!\xc1,\'P\xb3prt\xc4b{\xf3o\xce\xc0vTI\xb9\xfc"\x8e\x03e\x90\x8aT\xfc\xdf\xb0A& \x8f\x91JIx\xb2>\x92~\xc3Li\xf6\x04k7\xbf\x9b\xd2\rc\x87(\xaa\x00D\x15[\x19t\x92=1\xab\xbflf\xb7D\x11\x11"\x16lJ\x18\xbc3\xda!`\xbf-X\x08\x83\xd7\xe5\x9b\x87\x0b\xf6A\xa2\x8f\xbd\x13\x17/\xdd\xd5\xa8\xa3Q\x9f2B\xc4\x7f\x1e\x8c\xafrZ\xab\xf2\x98\x19\x89\xe4\xf3\xa0\x8c<r\x02\xf8<\xb9\x9d`\xcd\xbd\xa8\x86\x88Z/\x05\xb3p\xf3\tzY\xad~\xd3jA\xfd\xed\xacn\x03~b\xa2Ru\xbfW\x11\x91\xac\xd8\x00x]\x15\x16K\xa0\xe6L\x12yC\xa2\xe4\x94\xae\\O\x8e\xb8d\x12\xa8\xa0\x0c\x13\x81\x1e~\x11\x1e\xc8,\xa8\x0c\x11.\xc3f1\xff>\x9b7\xe1\x9a\x9d*\x00\x00\x00\x00\x00\x00\x00\x01'

In [4]:
def server_add(server, user) -> c_int64:
    buffer = FfiByteBuffer()
    err = FfiError()
    lib_fn = _get_func("allosaurus_server_add")
    lib_fn(server, _encode_bytes(user), byref(buffer), byref(err))
    print("here")
    if not buffer:
        message = string_at(err.message)
        raise Exception(message)
    buffer = _decode_bytes(buffer)
    return buffer
server_add(server, new_user(server))

: 