Skip to content

Commit

Permalink
Add more fields to events
Browse files Browse the repository at this point in the history
  • Loading branch information
mario4tier committed Mar 22, 2024
1 parent c3f186f commit 53bc483
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 68 deletions.
14 changes: 6 additions & 8 deletions move/sources/api.move
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ module dtp::api {
use dtp::pipe::{Pipe};
use dtp::inner_pipe::{InnerPipe};

use dtp::conn_objects::{ConnObjects};

// === Friends ===

// === Errors ===
Expand Down Expand Up @@ -76,30 +74,30 @@ module dtp::api {
// Transmit a request toward the server.
//
// The encoding of the 'data' depends on the service.
public fun send_request(service_idx: u8, data: &vector<u8>, ipipe: &mut InnerPipe, args: vector<u8>, ctx: &mut TxContext): vector<u8>
public fun send_request(ipipe: &mut InnerPipe, data: vector<u8>, args: vector<u8>, ctx: &mut TxContext): vector<u8>
{
let kvargs = kvalues::from_bytes(&args);
let ret_value = dtp::api_impl::send_request(service_idx, data, ipipe, &kvargs, ctx);
let ret_value = dtp::api_impl::send_request(ipipe, data, &kvargs, ctx);
kvalues::to_bytes(&ret_value)
}

// Transmit a response toward the client.
//
// The encoding of the 'data' depends on the service.
public fun send_response(service_idx: u8, data: &vector<u8>, seq_number: u64, ipipe: &mut InnerPipe, args: vector<u8>, ctx: &mut TxContext): vector<u8>
public fun send_response( ipipe: &mut InnerPipe, seq_num: u64, data: vector<u8>, args: vector<u8>, ctx: &mut TxContext): vector<u8>
{
let kvargs = kvalues::from_bytes(&args);
let ret_value = dtp::api_impl::send_response(service_idx, data, seq_number, ipipe, &kvargs, ctx);
let ret_value = dtp::api_impl::send_response(ipipe, seq_num, data, &kvargs, ctx);
kvalues::to_bytes(&ret_value)
}

// Transmit a notification toward the peer (no response expected).
//
// The encoding of the 'data' depends on the service.
public fun send_notification(service_idx: u8, data: &vector<u8>, ipipe: &mut InnerPipe, args: vector<u8>, ctx: &mut TxContext): vector<u8>
public fun send_notification(ipipe: &mut InnerPipe, data: &vector<u8>, args: vector<u8>, ctx: &mut TxContext): vector<u8>
{
let kvargs = kvalues::from_bytes(&args);
let ret_value = dtp::api_impl::send_notification(service_idx, data, ipipe,&kvargs, ctx);
let ret_value = dtp::api_impl::send_notification( ipipe, data,&kvargs, ctx);
kvalues::to_bytes(&ret_value)
}

Expand Down
37 changes: 29 additions & 8 deletions move/sources/api_impl.move
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ module dtp::api_impl {

use dtp::host::{Self,Host};
use dtp::transport_control::{Self};
use dtp::conn_objects::{Self,ConnObjects};
use dtp::conn_objects::{Self};
use dtp::transport_control::{TransportControl};
use dtp::pipe::{Pipe};
use dtp::inner_pipe::{Self,InnerPipe};
use dtp::kvalues::{Self,KValues};
use dtp::events::{Self};

use dtp::weak_ref::{Self};

Expand Down Expand Up @@ -58,9 +59,9 @@ module dtp::api_impl {
// Create the connection. Will emit an event on success
transport_control::create_best_effort(service_idx, cli_host, srv_host, &mut conn, ctx);

// TODO Add references in Host object for slow discovery.
//host::add_connection(cli_host, &conn.transport_control);
//host::add_connection(srv_host, &conn.transport_control);
// Add weak references in Host objects for slow discovery.
host::add_connection(cli_host, weak_ref::new_from_address(conn_objects::get_tc_address(&conn)));
host::add_connection(srv_host, weak_ref::new_from_address(conn_objects::get_tc_address(&conn)));

kvalues::new()
}
Expand Down Expand Up @@ -96,23 +97,43 @@ module dtp::api_impl {
// Transmit a request toward the server.
//
// The encoding of the 'data' depends on the service.
public(friend) fun send_request(_service_idx: u8, _data: &vector<u8>, _ipipe: &InnerPipe, _kvargs: &KValues, _ctx: &mut TxContext): KValues
{
public(friend) fun send_request(ipipe: &mut InnerPipe, data: vector<u8>, _kvargs: &KValues, _ctx: &mut TxContext): KValues
{
let seq_num = inner_pipe::inc_seq_num(ipipe);

// Emit a request event.
let ipipe_ref = weak_ref::new_from_obj(ipipe);
let tc_ref = inner_pipe::get_tc_ref(ipipe);
let service_idx = inner_pipe::get_service_idx(ipipe);
events::emit_request(service_idx, seq_num, tc_ref, ipipe_ref, data);

// Update stats for debugging.
inner_pipe::inc_emit_cnt(ipipe);

kvalues::new()
}

// Transmit a response toward the client.
//
// The encoding of the 'data' depends on the service.
public(friend) fun send_response(_service_idx: u8, _data: &vector<u8>, _seq_number: u64, _ipipe: &InnerPipe, _kvargs: &KValues, _ctx: &mut TxContext): KValues
public(friend) fun send_response(ipipe: &mut InnerPipe, seq_num: u64, data: vector<u8>, _kvargs: &KValues, _ctx: &mut TxContext): KValues
{
// Emit a response event.
let ipipe_ref = weak_ref::new_from_obj(ipipe);
let tc_ref = inner_pipe::get_tc_ref(ipipe);
let service_idx = inner_pipe::get_service_idx(ipipe);
events::emit_response(service_idx, seq_num, tc_ref, ipipe_ref, data);

// Update stats for debugging.
inner_pipe::inc_emit_cnt(ipipe);

kvalues::new()
}

// Transmit a notification toward the peer (no response expected).
//
// The encoding of the 'data' depends on the service.
public(friend) fun send_notification(_service_idx: u8, _data: &vector<u8>, _ipipe: &InnerPipe, _kvargs: &KValues, _ctx: &mut TxContext): KValues
public(friend) fun send_notification(_ipipe: &InnerPipe, _data: &vector<u8>, _kvargs: &KValues, _ctx: &mut TxContext): KValues
{
kvalues::new()
}
Expand Down
24 changes: 24 additions & 0 deletions move/sources/conn_objects.move
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module dtp::conn_objects {
// If an end-point loose these references, they can be
// re-discovered using one of the related Host object.
tc: address, // TransportControl
cli_auth: address,
srv_auth: address,
cli_tx_pipe: address,
srv_tx_pipe: address,
cli_tx_ipipes: vector<address>,
Expand All @@ -37,6 +39,8 @@ module dtp::conn_objects {
public(friend) fun new(): ConnObjects {
ConnObjects{
tc: @0x0,
cli_auth: @0x0,
srv_auth: @0x0,
cli_tx_pipe: @0x0,
srv_tx_pipe: @0x0,
cli_tx_ipipes: vector::empty(),
Expand All @@ -48,6 +52,26 @@ module dtp::conn_objects {
self.tc = tc;
}

public(friend) fun get_tc_address(self: &ConnObjects): address {
self.tc
}

public(friend) fun set_cli_auth(self: &mut ConnObjects, cli_auth: address) {
self.cli_auth = cli_auth;
}

public(friend) fun get_cli_auth_address(self: &ConnObjects): address {
self.cli_auth
}

public(friend) fun set_srv_auth(self: &mut ConnObjects, srv_auth: address) {
self.srv_auth = srv_auth;
}

public(friend) fun get_srv_auth_address(self: &ConnObjects): address {
self.srv_auth
}

public(friend) fun set_cli_tx_pipe(self: &mut ConnObjects, cli_tx_pipe: address) {
self.cli_tx_pipe = cli_tx_pipe;
}
Expand Down
38 changes: 32 additions & 6 deletions move/sources/events.move
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,38 @@ module dtp::events {
// === Imports ===
use sui::event;
use dtp::conn_objects::ConnObjects;
use dtp::weak_ref::WeakRef;

// === Friends ===
friend dtp::host;
friend dtp::inner_pipe;
friend dtp::transport_control;
friend dtp::transport_control;
friend dtp::api_impl;

// === Errors ===

// === Constants ===

// === Structs ===

// TODO Add KValues to all events for future proofing.
struct ConnReq has copy, drop {
service_idx: u8, // Service Type
conn: ConnObjects, // Info to get the connection started (e.g. Pipes and InnerPipes addresses).
flags: u8, // Reserve for future.
src: u8, // Typically 0x03 because coming from Host.
src_addr: address, // Host Address
service_idx: u8, // Service Type [1..253]
conn: ConnObjects, // Enough info to get the connection started (e.g. TC, Pipes and InnerPipes addresses).
}

struct Datagram has copy, drop {
flags: u8, // Reserve for future.
src: u8, // 0x01 or 0x02 for respectively cli_tx_ipipe and srv_tx_ipipe.
src_addr: address, // InnerPipe Address
service_idx: u8, // Service Type [1..253]
seq_num: u64,
tc_ref: WeakRef, // TransportControl Address.
data: vector<u8>, // The endpoint response/request (e.g. JSON-RPC).
}

// === Public-Mutative Functions ===

Expand All @@ -28,13 +44,23 @@ module dtp::events {
// === Admin Functions ===

// === Public-Friend Functions ===
public(friend) fun emit_conn_req( service_idx: u8, conn: ConnObjects ) {
event::emit(ConnReq { service_idx, conn });
public(friend) fun emit_conn_req( src_addr: address, service_idx: u8, conn: ConnObjects ) {
event::emit(ConnReq { flags: 0, src: 0x03, src_addr, service_idx, conn });
}

public(friend) fun emit_response( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector<u8> ) {
let src_addr = dtp::weak_ref::get_address(&ipipe_ref);
event::emit(Datagram { flags: 0, src: 0x02, src_addr, service_idx, seq_num, tc_ref, data });
}

public(friend) fun emit_request( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector<u8> ) {
let src_addr = dtp::weak_ref::get_address(&ipipe_ref);
event::emit(Datagram { flags: 0, src: 0x01, src_addr, service_idx, seq_num, tc_ref, data });
}

// === Private Functions ===

// === Test Functions ===


}
17 changes: 11 additions & 6 deletions move/sources/host.move
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ module dtp::host {
}
}

#[allow(lint(share_owned))]
//#[allow(lint(share_owned))]
public(friend) fun new_transfered( ctx: &mut TxContext ): WeakRef
{
let new_obj = dtp::host::new(ctx);
Expand All @@ -154,7 +154,7 @@ module dtp::host {
new_obj_ref
}

public(friend) fun upsert_service(self: &mut Host, service_id: u8, service_type: u8, _args: &KValues, ctx: &mut TxContext )
public(friend) fun upsert_service(self: &mut Host, service_id: u8, service_type: u8, _args: &KValues, _ctx: &mut TxContext )
{
/*if (!table::contains(&self.services, service_idx )) {
//assert!(table::contains(&self.services, service_idx) == false, 1);
Expand Down Expand Up @@ -185,14 +185,19 @@ module dtp::host {
uid_to_address(&self.id)
}

public(friend) fun authority(host: &Host): address {
host.authority
public(friend) fun authority(self: &Host): address {
self.authority
}

public(friend) fun is_caller_authority(host: &Host, ctx: &TxContext): bool {
tx_context::sender(ctx) == host.authority
public(friend) fun is_caller_authority(self: &Host, ctx: &TxContext): bool {
tx_context::sender(ctx) == self.authority
}

public(friend) fun add_connection(_self: &mut Host, _tc_ref: WeakRef ) {
// TODO Keep track of connections for slow discovery.
}


// === Private Functions ===

// === Test Functions ===
Expand Down
73 changes: 54 additions & 19 deletions move/sources/inner_pipe.move
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
module dtp::inner_pipe {

// === Imports ===
use sui::object::{Self, UID, uid_to_address};
use sui::object::{Self, UID, ID, uid_to_address};
use sui::transfer::{Self};
use sui::tx_context::{TxContext};
use dtp::weak_ref::{Self,WeakRef};
Expand All @@ -32,50 +32,85 @@ module dtp::inner_pipe {
struct InnerPipe has key, store {
id: UID,
flgs: u8, // DTP version+esc flags always after UID.

pipe_id: WeakRef,

service_idx: u8,
tc_ref: WeakRef,
pipe_ref: WeakRef,
sync_data: PipeSyncData,
seq_num: u64,
// Stats to help debugging.
emit_cnt: u64,
sync_cnt: u64,
}

// === Public-Mutative Functions ===
public entry fun send(
_self: &mut InnerPipe,
_data: vector<u8>,
_ctx: &mut TxContext )
{
// TODO Emit the event. Add sequential number logic.
}

// === Public-View Functions ===

// === Admin Functions ===

// === Public-Friend Functions ===

public(friend) fun new( pipe_address: &address, ctx: &mut TxContext ): InnerPipe {
public(friend) fun new( service_idx: u8, tc_id: &ID, pipe_addr: address, ctx: &mut TxContext ): InnerPipe {
let new_obj = InnerPipe {
id: object::new(ctx),
flgs: 0u8,
pipe_id: weak_ref::new_from_address(*pipe_address),
sync_data: pipe_sync_data::new(),
service_idx,
tc_ref: weak_ref::new(tc_id),
pipe_ref: weak_ref::new_from_address(pipe_addr),
sync_data: pipe_sync_data::new(),
seq_num: 1,
emit_cnt:0,
sync_cnt: 0,
};
new_obj
}

public(friend) fun new_transfered( pipe_address: &address, recipient: address, ctx: &mut TxContext ): WeakRef
public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, pipe_addr: address, recipient: address, ctx: &mut TxContext ): address
{
let new_obj = new(pipe_address,ctx);
let new_obj_ref = weak_ref::new_from_address(uid_to_address(&new_obj.id));
let new_obj = new(service_idx, tc_id, pipe_addr, ctx);
let new_obj_addr = uid_to_address(&new_obj.id);
transfer::transfer(new_obj, recipient);
new_obj_ref
new_obj_addr
}

public(friend) fun delete( self: InnerPipe ) {
let InnerPipe { id, flgs: _, pipe_id: _, sync_data: _ } = self;
let InnerPipe { id, flgs: _, service_idx: _,
tc_ref: _, pipe_ref: _,
sync_data: _,
seq_num: _ ,
emit_cnt: _, sync_cnt: _
} = self;

object::delete(id);
}

public(friend) fun inc_seq_num( self: &mut InnerPipe ): u64 {
self.seq_num = self.seq_num + 1;
self.seq_num
}

public(friend) fun inc_emit_cnt( self: &mut InnerPipe ): u64 {
self.emit_cnt = self.emit_cnt + 1;
self.emit_cnt
}

public(friend) fun inc_sync_cnt( self: &mut InnerPipe ): u64 {
self.sync_cnt = self.sync_cnt + 1;
self.sync_cnt
}

public(friend) fun get_address( self: &InnerPipe ): address {
weak_ref::get_address(&self.tc_ref )
}

public (friend) fun get_tc_ref( self: &InnerPipe ): WeakRef {
self.tc_ref
}

public (friend) fun get_service_idx( self: &InnerPipe ): u8 {
self.service_idx
}

// === Private Functions ===

// === Test Functions ===
Expand Down
Loading

0 comments on commit 53bc483

Please sign in to comment.