Skip to content

Commit

Permalink
(#12) Add response logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mario4tier committed Mar 24, 2024
1 parent 53bc483 commit e976186
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 78 deletions.
8 changes: 4 additions & 4 deletions move/sources/api.move
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@ module dtp::api {
// Transmit a request toward the server.
//
// The encoding of the 'data' depends on the service.
public fun send_request(ipipe: &mut InnerPipe, data: vector<u8>, args: vector<u8>, ctx: &mut TxContext): vector<u8>
public fun send_request(ipipe: &mut InnerPipe, data: vector<u8>, cid: u64, args: vector<u8>, ctx: &mut TxContext): vector<u8>
{
let kvargs = kvalues::from_bytes(&args);
let ret_value = dtp::api_impl::send_request(ipipe, data, &kvargs, ctx);
let ret_value = dtp::api_impl::send_request(ipipe, data, cid, &kvargs, ctx);
kvalues::to_bytes(&ret_value)
}

// Transmit a response toward the client.
//
// The encoding of the 'data' depends on the service.
public fun send_response( ipipe: &mut InnerPipe, seq_num: u64, data: vector<u8>, args: vector<u8>, ctx: &mut TxContext): vector<u8>
public fun send_response( ipipe: &mut InnerPipe, req_ipipe_idx: u8, req_seq_num: u64, data: vector<u8>, cid: u64, args: vector<u8>, ctx: &mut TxContext): vector<u8>
{
let kvargs = kvalues::from_bytes(&args);
let ret_value = dtp::api_impl::send_response(ipipe, seq_num, data, &kvargs, ctx);
let ret_value = dtp::api_impl::send_response(ipipe, req_ipipe_idx, req_seq_num, data, cid, &kvargs, ctx);
kvalues::to_bytes(&ret_value)
}

Expand Down
21 changes: 15 additions & 6 deletions move/sources/api_impl.move
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,20 @@ module dtp::api_impl {
// Transmit a request toward the server.
//
// The encoding of the 'data' depends on the service.
public(friend) fun send_request(ipipe: &mut InnerPipe, data: vector<u8>, _kvargs: &KValues, _ctx: &mut TxContext): KValues
public(friend) fun send_request(ipipe: &mut InnerPipe, data: vector<u8>, cid: u64, _kvargs: &KValues, _ctx: &mut TxContext): KValues
{
let seq_num = inner_pipe::inc_seq_num(ipipe);
let req_seq_num = inner_pipe::inc_seq_num(ipipe);
let req_ipipe_idx = inner_pipe::get_ipipe_idx(ipipe);

// Emit a request event.
let cli_host_ref = inner_pipe::get_cli_host_ref(ipipe);
let srv_host_ref = inner_pipe::get_srv_host_ref(ipipe);
let ipipe_ref = weak_ref::new_from_obj(ipipe);
let peer_ipipe_ref = inner_pipe::get_peer_ref(ipipe);
let tc_ref = inner_pipe::get_tc_ref(ipipe);
let service_idx = inner_pipe::get_service_idx(ipipe);
events::emit_request(service_idx, seq_num, tc_ref, ipipe_ref, data);
let service_idx = inner_pipe::get_service_idx(ipipe);

events::emit_request(service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, ipipe_ref, peer_ipipe_ref, data, cid );

// Update stats for debugging.
inner_pipe::inc_emit_cnt(ipipe);
Expand All @@ -116,13 +121,17 @@ module dtp::api_impl {
// Transmit a response toward the client.
//
// The encoding of the 'data' depends on the service.
public(friend) fun send_response(ipipe: &mut InnerPipe, seq_num: u64, data: vector<u8>, _kvargs: &KValues, _ctx: &mut TxContext): KValues
public(friend) fun send_response(ipipe: &mut InnerPipe, req_ipipe_idx: u8, req_seq_num: u64, data: vector<u8>, cid: u64, _kvargs: &KValues, _ctx: &mut TxContext): KValues
{
// Emit a response event.
let cli_host_ref = inner_pipe::get_cli_host_ref(ipipe);
let srv_host_ref = inner_pipe::get_srv_host_ref(ipipe);
let ipipe_ref = weak_ref::new_from_obj(ipipe);
let peer_ipipe_ref = inner_pipe::get_peer_ref(ipipe);
let tc_ref = inner_pipe::get_tc_ref(ipipe);
let service_idx = inner_pipe::get_service_idx(ipipe);
events::emit_response(service_idx, seq_num, tc_ref, ipipe_ref, data);

events::emit_response(service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, ipipe_ref, peer_ipipe_ref, data, cid);

// Update stats for debugging.
inner_pipe::inc_emit_cnt(ipipe);
Expand Down
21 changes: 13 additions & 8 deletions move/sources/events.move
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ module dtp::events {
struct Datagram has copy, drop {
flags: u8, // Reserve for future.
src: u8, // 0x01 or 0x02 for respectively cli_tx_ipipe and srv_tx_ipipe.
src_addr: address, // InnerPipe Address
src_addr: address, // InnerPipe address emitting the event.
service_idx: u8, // Service Type [1..253]
seq_num: u64,
tc_ref: WeakRef, // TransportControl Address.
req_ipipe_idx: u8, // Uniquely identifies the originating request ipipe with an index [0..n_req_pipe-1]
req_seq_num: u64, // Sequence number assigned by the originating ipipe.
cli_host_ref: WeakRef, // Client Host Address (Optimization to minimize lookup at receiver).
srv_host_ref: WeakRef, // Server Host Address (Optimization to minimize lookup at receiver).
tc_ref: WeakRef, // TransportControl Address
peer_ipipe_ref: WeakRef, // InnerPipe in the other direction.
data: vector<u8>, // The endpoint response/request (e.g. JSON-RPC).
cid: u64, // Correlation ID from the originating request endpoint.
}

// === Public-Mutative Functions ===
Expand All @@ -48,19 +53,19 @@ module dtp::events {
event::emit(ConnReq { flags: 0, src: 0x03, src_addr, service_idx, conn });
}

public(friend) fun emit_response( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector<u8> ) {
public(friend) fun emit_response( service_idx: u8, req_ipipe_idx: u8, req_seq_num: u64, cli_host_ref: WeakRef, srv_host_ref: WeakRef, tc_ref: WeakRef, ipipe_ref: WeakRef, peer_ipipe_ref: WeakRef, data: vector<u8>, cid: u64 ) {
let src_addr = dtp::weak_ref::get_address(&ipipe_ref);
event::emit(Datagram { flags: 0, src: 0x02, src_addr, service_idx, seq_num, tc_ref, data });
event::emit(Datagram { flags: 0, src: 0x02, src_addr, service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, peer_ipipe_ref, data, cid });
}

public(friend) fun emit_request( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector<u8> ) {
public(friend) fun emit_request( service_idx: u8, req_ipipe_idx: u8, req_seq_num: u64, cli_host_ref: WeakRef, srv_host_ref: WeakRef, tc_ref: WeakRef, ipipe_ref: WeakRef, peer_ipipe_ref: WeakRef, data: vector<u8>, cid: u64 ) {
let src_addr = dtp::weak_ref::get_address(&ipipe_ref);
event::emit(Datagram { flags: 0, src: 0x01, src_addr, service_idx, seq_num, tc_ref, data });
event::emit(Datagram { flags: 0, src: 0x01, src_addr, service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, peer_ipipe_ref, data, cid });
}

// === Private Functions ===

// === Test Functions ===


}
54 changes: 47 additions & 7 deletions move/sources/inner_pipe.move
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ module dtp::inner_pipe {
struct InnerPipe has key, store {
id: UID,
flgs: u8, // DTP version+esc flags always after UID.
service_idx: u8,
ipipe_idx: u8,
service_idx: u8,
cli_host_ref: WeakRef,
srv_host_ref: WeakRef,
tc_ref: WeakRef,
pipe_ref: WeakRef,
peer_pipe_ref: WeakRef,
sync_data: PipeSyncData,
seq_num: u64,
// Stats to help debugging.
Expand All @@ -50,13 +54,17 @@ module dtp::inner_pipe {

// === Public-Friend Functions ===

public(friend) fun new( service_idx: u8, tc_id: &ID, pipe_addr: address, ctx: &mut TxContext ): InnerPipe {
public(friend) fun new( ipipe_idx: u8, service_idx: u8, cli_id: &ID, srv_id: &ID, tc_id: &ID, pipe_addr: address, ctx: &mut TxContext ): InnerPipe {
let new_obj = InnerPipe {
id: object::new(ctx),
flgs: 0u8,
ipipe_idx,
service_idx,
cli_host_ref: weak_ref::new(cli_id),
srv_host_ref: weak_ref::new(srv_id),
tc_ref: weak_ref::new(tc_id),
pipe_ref: weak_ref::new_from_address(pipe_addr),
peer_pipe_ref: weak_ref::new_empty(),
sync_data: pipe_sync_data::new(),
seq_num: 1,
emit_cnt:0,
Expand All @@ -65,17 +73,20 @@ module dtp::inner_pipe {
new_obj
}

public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, pipe_addr: address, recipient: address, ctx: &mut TxContext ): address
public(friend) fun new_transfered( ipipe_idx: u8, service_idx: u8, cli_id: &ID, srv_id: &ID, tc_id: &ID, pipe_addr: address, recipient: address, ctx: &mut TxContext ): address
{
let new_obj = new(service_idx, tc_id, pipe_addr, ctx);
let new_obj = new( ipipe_idx, service_idx, cli_id, srv_id, tc_id, pipe_addr, ctx);
let new_obj_addr = uid_to_address(&new_obj.id);
transfer::transfer(new_obj, recipient);
new_obj_addr
}

public(friend) fun delete( self: InnerPipe ) {
let InnerPipe { id, flgs: _, service_idx: _,
let InnerPipe { id, flgs: _,
ipipe_idx: _, service_idx: _,
cli_host_ref: _, srv_host_ref: _,
tc_ref: _, pipe_ref: _,
peer_pipe_ref: _,
sync_data: _,
seq_num: _ ,
emit_cnt: _, sync_cnt: _
Expand Down Expand Up @@ -103,14 +114,43 @@ module dtp::inner_pipe {
weak_ref::get_address(&self.tc_ref )
}

public (friend) fun get_tc_ref( self: &InnerPipe ): WeakRef {
public(friend) fun get_cli_host_ref( self: &InnerPipe ): WeakRef {
self.cli_host_ref
}

public(friend) fun get_srv_host_ref( self: &InnerPipe ): WeakRef {
self.srv_host_ref
}

public(friend) fun get_tc_ref( self: &InnerPipe ): WeakRef {
self.tc_ref
}

public (friend) fun get_service_idx( self: &InnerPipe ): u8 {
public(friend) fun get_peer_ref( self: &InnerPipe ): WeakRef {
self.peer_pipe_ref
}

public(friend) fun get_service_idx( self: &InnerPipe ): u8 {
self.service_idx
}

public(friend) fun get_ipipe_idx( self: &InnerPipe ): u8 {
self.ipipe_idx
}

public(friend) fun get_ipipe_address( self: &InnerPipe ): address {
uid_to_address(&self.id)
}

public(friend) fun set_peer_ref( self: &mut InnerPipe, peer_pipe: &InnerPipe ) {
weak_ref::set( &mut self.peer_pipe_ref, object::borrow_id<InnerPipe>(peer_pipe));
}

public(friend) fun transfer( self: InnerPipe, recipient: address ) {
transfer::transfer(self, recipient);
}


// === Private Functions ===

// === Test Functions ===
Expand Down
92 changes: 48 additions & 44 deletions move/sources/pipe.move
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ module dtp::pipe {
id: UID,
flgs: u8, // DTP version+esc flags always after UID.
service_idx: u8,
cli_host_ref: WeakRef,
srv_host_ref: WeakRef,
tc_ref: WeakRef, // TransportControl
ipipe_refs: vector<WeakRef>, // InnerPipe(s)
sync_data: PipeSyncData, // Merged of all InnerPipe sync_data.
Expand All @@ -49,67 +51,69 @@ module dtp::pipe {

// === Public-Friend Functions ===

public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, ipipe_count: u8, recipient: address, is_cli_tx_pipe: bool, conn: &mut ConnObjects, ctx: &mut TxContext ): address {
// Create two pipes at once
public(friend) fun new_pipes( service_idx: u8, cli_id: &ID, srv_id: &ID, tc_id: &ID, ipipe_count: u8, cli_recipient: address, srv_recipient: address, conn: &mut ConnObjects, ctx: &mut TxContext ): (Pipe,Pipe) {
assert!(ipipe_count > 0, errors::EInvalidPipeCount());

let new_pipe = Pipe {
let cli_pipe = Pipe {
id: object::new(ctx),
flgs: 0,
service_idx,
cli_host_ref: weak_ref::new(cli_id),
srv_host_ref: weak_ref::new(srv_id),
tc_ref: weak_ref::new(tc_id),
ipipe_refs: vector::empty(),
sync_data: pipe_sync_data::new(),
};

let pipe_addr = uid_to_address(&new_pipe.id);
if (is_cli_tx_pipe) {
conn_objects::set_cli_tx_pipe(conn, pipe_addr);
} else {
conn_objects::set_srv_tx_pipe(conn, pipe_addr);
let srv_pipe = Pipe {
id: object::new(ctx),
flgs: 0,
service_idx,
cli_host_ref: weak_ref::new(cli_id),
srv_host_ref: weak_ref::new(srv_id),
tc_ref: weak_ref::new(tc_id),
ipipe_refs: vector::empty(),
sync_data: pipe_sync_data::new(),
};


let cli_pipe_addr = uid_to_address(&cli_pipe.id);
conn_objects::set_cli_tx_pipe(conn, cli_pipe_addr);

let srv_pipe_addr = uid_to_address(&srv_pipe.id);
conn_objects::set_srv_tx_pipe(conn, srv_pipe_addr);

// Create InnerPipes.
let i: u8 = 0;
while (i < ipipe_count) {
let ipipe_addr = inner_pipe::new_transfered(service_idx, tc_id, pipe_addr, recipient, ctx);
let ipipe_idx: u8 = 0;
while (ipipe_idx < ipipe_count) {
let cli_ipipe= inner_pipe::new(ipipe_idx, service_idx, cli_id, srv_id, tc_id, cli_pipe_addr, ctx);
let srv_ipipe= inner_pipe::new(ipipe_idx, service_idx, cli_id, srv_id, tc_id, srv_pipe_addr, ctx);

// Cross-reference the inner pipes.
inner_pipe::set_peer_ref(&mut cli_ipipe, &srv_ipipe);
inner_pipe::set_peer_ref(&mut srv_ipipe, &cli_ipipe);

// Save WeakRef in the Pipe object (for slow discovery), and the addresses in
// the ConnObjects (to be return/emitted to the end-points).
if (is_cli_tx_pipe) {
conn_objects::add_cli_tx_ipipe(conn, ipipe_addr);
} else {
conn_objects::add_srv_tx_ipipe(conn, ipipe_addr);
};
let ipipe_ref = weak_ref::new_from_address(ipipe_addr);
vector::push_back(&mut new_pipe.ipipe_refs, ipipe_ref);
i = i + 1;
let cli_ipipe_addr = inner_pipe::get_ipipe_address(&cli_ipipe);
let srv_ipipe_addr = inner_pipe::get_ipipe_address(&srv_ipipe);

conn_objects::add_cli_tx_ipipe(conn, cli_ipipe_addr);
conn_objects::add_srv_tx_ipipe(conn, srv_ipipe_addr);

let cli_ipipe_ref = weak_ref::new_from_address(cli_ipipe_addr);
let srv_ipipe_ref = weak_ref::new_from_address(srv_ipipe_addr);
vector::push_back(&mut cli_pipe.ipipe_refs, cli_ipipe_ref);
vector::push_back(&mut srv_pipe.ipipe_refs, srv_ipipe_ref);
inner_pipe::transfer(cli_ipipe, cli_recipient);
inner_pipe::transfer(srv_ipipe, srv_recipient);
ipipe_idx = ipipe_idx + 1;
};

transfer::transfer(new_pipe, recipient);

pipe_addr
(cli_pipe, srv_pipe)
}

/* TODO
public(friend) fun delete( self: Pipe, ipipes: vector<InnerPipe> ) {
let Pipe { id, flgs: _, sync_data: _, tc: _, ipipes } = self;
// Delete all the inner pipes.
// For tracking/debugging purpose, the weak ref is not removed from vector (only cleared).
let i: u64 = 0;
while (i < vector::length(&ipipes)) {
let inner_pipe_ref = vector::borrow_mut(&mut ipipes, i);
let inner_pipe_id = weak_ref::id(inner_pipe_ref);
weak_ref::clear(inner_pipe_ref);
object::delete(inner_pipe_id);
inner_pipe::delete(inner_pipe_id);
i = i + 1;
};
object::delete(id);
public(friend) fun transfer( self: Pipe, recipient: address ) {
transfer::transfer(self, recipient);
}
*/

// === Private Functions ===

// === Test Functions ===

}
}
Loading

0 comments on commit e976186

Please sign in to comment.