Skip to content

Commit

Permalink
chore: clean table
Browse files Browse the repository at this point in the history
  • Loading branch information
fengyc committed Jun 13, 2023
1 parent 2c83f70 commit b63b909
Showing 1 changed file with 52 additions and 35 deletions.
87 changes: 52 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,42 +290,51 @@ where
let mut buffer = BytesMut::with_capacity(max_packet_size);
let mut stop_rx = stop_rx;

loop {
// prepare buffer
buffer.reserve(max_packet_size);
unsafe { buffer.set_len(max_packet_size) };
let block_result: Result<()> = async move {
loop {
// prepare buffer
buffer.reserve(max_packet_size);
unsafe { buffer.set_len(max_packet_size) };

// read packet until error or stopped
let n = select! {
_ = stop_rx.recv() => break,
r = read_ams_packet(&mut reader, &mut buffer[..]) => r?,
};
unsafe { buffer.set_len(n) };

// parse result
let ams_header = parse_ams_packet_slice(&buffer)?.1;
log::debug!("ams packet: {}", ams_header);
let source_ams_addr = AmsAddr(ams_header.source_net_id(), ams_header.source_port());
let target_ams_addr = AmsAddr(ams_header.target_net_id(), ams_header.target_port());

// update forward table
if Some(true) != table.read().await.get(&source_ams_addr).map(|x| x.0 == remote) {
log::info!("update forward table {} socket {}", source_ams_addr, remote);
let mut table = table.write().await;
table.insert(source_ams_addr, (remote, data_tx.clone()));
}

// read packet until error or stopped
let n = select! {
_ = stop_rx.recv() => break,
r = read_ams_packet(&mut reader, &mut buffer[..]) => r?,
};
unsafe { buffer.set_len(n) };

// parse result
let ams_header = parse_ams_packet_slice(&buffer)?.1;
log::debug!("ams packet: {}", ams_header);
let source_ams_addr = AmsAddr(ams_header.source_net_id(), ams_header.source_port());
let target_ams_addr = AmsAddr(ams_header.target_net_id(), ams_header.target_port());

// update forward table
if Some(true) != table.read().await.get(&source_ams_addr).map(|x| x.0 == remote) {
log::info!("update forward table {} socket {}", source_ams_addr, remote);
let mut table = table.write().await;
table.insert(source_ams_addr, (remote, data_tx.clone()));
// forward data
let packet = buffer.split();
let forward_table = table.read().await;
let target = forward_table.get(&target_ams_addr);
let target = target.or_else(|| forward_table.get(&AmsAddr(target_ams_addr.0, 0)));
if let Some((target_remote, target_sender)) = target {
log::debug!("forward {} to socket {}", target_ams_addr, target_remote);
target_sender.send(packet).await?;
} else {
log::debug!("can not handle {}", target_ams_addr)
}
}

// forward data
let packet = buffer.split();
let forward_table = table.read().await;
let target = forward_table.get(&target_ams_addr);
let target = target.or_else(|| forward_table.get(&AmsAddr(target_ams_addr.0, 0)));
if let Some((target_remote, target_sender)) = target {
log::debug!("forward {} to socket {}", target_ams_addr, target_remote);
target_sender.send(packet).await?;
} else {
log::debug!("can not handle {}", target_ams_addr)
}
Ok(())
}
.await;

if let Err(e) = block_result {
log::info!("read socket {} error: {}", remote, e);
}

log::info!("reading socket {} stopped", remote);
Expand All @@ -349,10 +358,16 @@ where
_ => break,
},
};
if packet.is_empty() {
break;
}
// write data
select! {
_ = stop_rx.recv() => break,
r = writer.write_all(&packet) => r?,
r = writer.write_all(&packet) => if let Err(e) = r {
log::error!("write socket {} error: {}", socket_addr, e);
break;
}
}
log::debug!("write socket {} {} bytes", socket_addr, packet.len());
}
Expand Down Expand Up @@ -447,8 +462,9 @@ async fn accept_client(args: Arc<Args>, table: Table, stop_receiver: EventReceiv
let (stop_tx, stop_rx) = broadcast::channel(2);

let stop_rx1 = stop_rx.resubscribe();
let data_tx1 = data_tx.clone();
let writing = writing(remote, client_write, stop_rx1, data_rx);
let reading = reading(remote, client_read, stop_rx, packet_size, data_tx, table.clone());
let reading = reading(remote, client_read, stop_rx, packet_size, data_tx1, table.clone());

if let Err(e) = select! {
r = reading => r,
Expand All @@ -460,6 +476,7 @@ async fn accept_client(args: Arc<Args>, table: Table, stop_receiver: EventReceiv
if let Err(e) = stop_tx.send(()) {
log::error!("stop client {} error: {}", remote, e);
}
data_tx.send(BytesMut::with_capacity(0)).await;

// clean table
table.write().await.retain(|a, x| {
Expand Down

0 comments on commit b63b909

Please sign in to comment.