Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed bitrate calculation #2018

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions alvr/client_core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ fn connection_pipeline(capabilities: ClientCapabilities) -> ConResult {
stream_socket.subscribe_to_stream::<Haptics>(HAPTICS, MAX_UNREAD_PACKETS);
let statistics_sender = stream_socket.request_stream(STATISTICS);

let mut actual_throughput_inseconds: f32 = 30_000_000.0;
Copy link
Member

@zarik5 zarik5 Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to actual_throughput_bps

let video_receive_thread = thread::spawn(move || {
let mut stream_corrupted = false;
while is_streaming() {
Expand All @@ -283,8 +284,22 @@ fn connection_pipeline(capabilities: ClientCapabilities) -> ConResult {
return;
};

let dt_throughput = data.get_throughput_timediff();

if dt_throughput != Duration::ZERO {
// TODO: Assuming UDP for 42.0, for TCP it would be 54.0 instead. This loses a bit of accuracy for TCP (it's still a good estimate) but I could need to import session settings in the future
Comment on lines +289 to +290
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like you to make this correct for TCP from the start


let data_including_headers = (data.get_size_frame_bytes()) as f32
+ 42.0 * (data.get_shards_in_frame() as f32); // UDP and IPv4 headers count as bytes for throughput too
actual_throughput_inseconds =
data_including_headers * 8.0 / dt_throughput.as_secs_f32(); // bitrate for encoder is in bits per second, here we had bytes; so we need to multiply by 8 the data size
} else {
actual_throughput_inseconds = 0.0;
}

if let Some(stats) = &mut *STATISTICS_MANAGER.lock() {
stats.report_video_packet_received(header.timestamp);
stats.report_throughput_client(header.timestamp, actual_throughput_inseconds)
}

if header.is_idr {
Expand Down
9 changes: 9 additions & 0 deletions alvr/client_core/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl StatisticsManager {
}
}

pub fn report_throughput_client(&mut self, target_timestamp: Duration, throughput_client: f32) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to throughput_client_bps

if let Some(frame) = self
.history_buffer
.iter_mut()
.find(|frame| frame.client_stats.target_timestamp == target_timestamp)
{
frame.client_stats.throughput_client = throughput_client;
}
}
// vsync_queue is the latency between this call and the vsync. it cannot be measured by ALVR and
// should be reported by the VR runtime
pub fn report_submit(&mut self, target_timestamp: Duration, vsync_queue: Duration) {
Expand Down
1 change: 1 addition & 0 deletions alvr/packets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ pub struct ClientStatistics {
pub rendering: Duration,
pub vsync_queue: Duration,
pub total_pipeline_latency: Duration,
pub throughput_client: f32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before

}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
3 changes: 2 additions & 1 deletion alvr/server/src/bitrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl BitrateManager {
timestamp: Duration,
network_latency: Duration,
decoder_latency: Duration,
throughput_reported_client: f32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before

) {
if network_latency.is_zero() {
return;
Expand All @@ -109,7 +110,7 @@ impl BitrateManager {
while let Some(&(timestamp_, size_bits)) = self.packet_sizes_bits_history.front() {
if timestamp_ == timestamp {
self.bitrate_average
.submit_sample(size_bits as f32 / network_latency.as_secs_f32());
.submit_sample(throughput_reported_client);

self.packet_sizes_bits_history.pop_front();

Expand Down
2 changes: 2 additions & 0 deletions alvr/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ fn connection_pipeline(
if let Some(stats) = &mut *STATISTICS_MANAGER.lock() {
let timestamp = client_stats.target_timestamp;
let decoder_latency = client_stats.video_decode;
let throughput_client: f32 = client_stats.throughput_client;
let network_latency = stats.report_statistics(client_stats);

let server_data_lock = SERVER_DATA_MANAGER.read();
Expand All @@ -1023,6 +1024,7 @@ fn connection_pipeline(
timestamp,
network_latency,
decoder_latency,
throughput_client,
);
}
}
Expand Down
8 changes: 1 addition & 7 deletions alvr/server/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,7 @@ impl StatisticsManager {
self.packets_lost_partial_sum = 0;
}

// While not accurate, this prevents NaNs and zeros that would cause a crash or pollute
// the graph
let bitrate_bps = if network_latency != Duration::ZERO {
frame.video_packet_bytes as f32 * 8.0 / network_latency.as_secs_f32()
} else {
0.0
};
let bitrate_bps = client_stats.throughput_client;

// todo: use target timestamp in nanoseconds. the dashboard needs to use the first
// timestamp as the graph time origin.
Expand Down
54 changes: 54 additions & 0 deletions alvr/sockets/src/stream_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{
net::{IpAddr, TcpListener, UdpSocket},
sync::{mpsc, Arc},
time::Duration,
time::Instant,
};

const SHARD_PREFIX_SIZE: usize = mem::size_of::<u32>() // packet length - field itself (4 bytes)
Expand Down Expand Up @@ -169,12 +170,24 @@ pub struct ReceiverData<H> {
used_buffer_queue: mpsc::Sender<Vec<u8>>,
had_packet_loss: bool,
_phantom: PhantomData<H>,
throughput_time_diff_frame: Duration,
shards_in_frame: usize,
}

impl<H> ReceiverData<H> {
pub fn had_packet_loss(&self) -> bool {
self.had_packet_loss
}

pub fn get_throughput_timediff(&self) -> Duration {
self.throughput_time_diff_frame
}
pub fn get_size_frame_bytes(&self) -> usize {
self.size
}
pub fn get_shards_in_frame(&self) -> usize {
self.shards_in_frame
}
}

impl<H: DeserializeOwned> ReceiverData<H> {
Expand Down Expand Up @@ -202,8 +215,12 @@ struct ReconstructedPacket {
index: u32,
buffer: Vec<u8>,
size: usize, // contains prefix
throughput_diff_frame: Duration,
shards_in_frame: usize,
}

pub const VIDEO_ID: u16 = 3;

pub struct StreamReceiver<H> {
packet_receiver: mpsc::Receiver<ReconstructedPacket>,
used_buffer_queue: mpsc::Sender<Vec<u8>>,
Expand Down Expand Up @@ -257,6 +274,8 @@ impl<H: DeserializeOwned + Serialize> StreamReceiver<H> {
used_buffer_queue: self.used_buffer_queue.clone(),
had_packet_loss,
_phantom: PhantomData,
throughput_time_diff_frame: packet.throughput_diff_frame,
shards_in_frame: packet.shards_in_frame,
})
}
}
Expand Down Expand Up @@ -323,6 +342,10 @@ impl StreamSocketBuilder {
receive_socket,
shard_recv_state: None,
stream_recv_components: HashMap::new(),
first_shard_frame_rx: Some(Instant::now()),
last_shard_frame_rx: Some(Instant::now()),
throughput_time_diff: Some(Duration::from_millis(100)),
last_new_frame_id: 0,
})
}

Expand Down Expand Up @@ -368,6 +391,10 @@ impl StreamSocketBuilder {
receive_socket,
shard_recv_state: None,
stream_recv_components: HashMap::new(),
first_shard_frame_rx: Some(Instant::now()),
last_shard_frame_rx: Some(Instant::now()),
throughput_time_diff: Some(Duration::from_millis(100)),
last_new_frame_id: 0,
})
}
}
Expand Down Expand Up @@ -405,6 +432,10 @@ pub struct StreamSocket {
receive_socket: Box<dyn SocketReader>,
shard_recv_state: Option<RecvState>,
stream_recv_components: HashMap<u16, StreamRecvComponents>,
first_shard_frame_rx: Option<Instant>,
last_shard_frame_rx: Option<Instant>,
throughput_time_diff: Option<Duration>,
last_new_frame_id: u32,
}

impl StreamSocket {
Expand Down Expand Up @@ -476,6 +507,24 @@ impl StreamSocket {
let shards_count = u32::from_be_bytes(bytes[10..14].try_into().unwrap()) as usize;
let shard_index = u32::from_be_bytes(bytes[14..18].try_into().unwrap()) as usize;

if stream_id == VIDEO_ID {
if packet_index != self.last_new_frame_id {
self.last_new_frame_id = packet_index; // only if we receive a new frame
if shard_index == 0 {
self.first_shard_frame_rx = Some(Instant::now());
}
} else if shard_index == shards_count - 1 {
// if it's the same frame as in first_shard_frame_rx and we haven't received a new frame yet, compute time difference
self.throughput_time_diff = Some(
Instant::now().saturating_duration_since(
self.last_shard_frame_rx
.unwrap_or_else(|| Instant::now() - Duration::from_millis(555)),
),
);
self.last_shard_frame_rx = Some(Instant::now());
}
}

self.shard_recv_state.insert(RecvState {
shard_length,
stream_id,
Expand Down Expand Up @@ -593,6 +642,7 @@ impl StreamSocket {
// Check if packet is complete and send
if in_progress_packet.received_shard_indices.len() == shard_recv_state_mut.shards_count {
let size = in_progress_packet.buffer_length;
let get_shards_in_frame = shard_recv_state_mut.shards_count;
components
.packet_queue
.send(ReconstructedPacket {
Expand All @@ -603,6 +653,10 @@ impl StreamSocket {
.unwrap()
.buffer,
size,
throughput_diff_frame: self
.throughput_time_diff
.unwrap_or(Duration::from_millis(100)),
shards_in_frame: get_shards_in_frame,
})
.ok();

Expand Down
4 changes: 4 additions & 0 deletions wiki/How-ALVR-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ The specific packet format used over the network is not clearly defined since AL

Since the amount of data streamed is large, the socket buffer size is increased both on the driver side and on the client.

Previous versions of ALVR made an estimation at the server of the network throughput that the client receives (from video), by dividing the size in bits of each frame between the network latency. Unfortunately, some secondary issues came from this due to noisiness and undeterministic changes in the RTT. These problems affected mainly the effective bitrate of the video stream, and could cause huge drops in quality when network latency increased.

A change has been made so that at the stream_socket level for video streaming, the client keeps in memory the Instant of receiving the last shard of a frame (using shard_index field in header). In this way, when a newer frame is received the time difference at client level can be computed. This time difference is then used at a higher level on client_core/src/connection.rs to compute an estimation of the throughput received by the client at each frame, also taking into account the individual shard headers. This value is passed through ClientStatistics to the server, and from server/src/connection.rs the client throughput is used finally at the BitrateManager; where the reported client value is submitted to the bitrate average.

## SteamVR driver

The driver is the component responsible for most of the streamer functionality. It is implemented as a shared library loaded by SteamVR. It implements the [OpenVR API](https://github.com/ValveSoftware/openvr) in order to interface with SteamVR.
Expand Down
Loading