Skip to content

Commit

Permalink
Merge branch 'master' into gb28181
Browse files Browse the repository at this point in the history
# Conflicts:
#	Cargo.lock
#	protocol/rtmp/src/remuxer/rtsp2rtmp.rs
  • Loading branch information
hailiang8 committed Jan 13, 2024
2 parents eeb591c + 999c87a commit d1c63f7
Show file tree
Hide file tree
Showing 28 changed files with 540 additions and 158 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions application/xiu/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ pub async fn run(producer: StreamHubEventSender, port: usize) {
.route("/get_stream_status", get(status))
.route("/kick_off_client", post(kick));

log::info!("Http api server listening on http://:{}", port);
axum::Server::bind(&([127, 0, 0, 1], port as u16).into())
log::info!("Http api server listening on http://0.0.0.0:{}", port);
axum::Server::bind(&([0, 0, 0, 0], port as u16).into())
.serve(app.into_make_service())
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions application/xiu/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
xiu::{config, config::Config, service::Service},
};

// #[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<()> {
let log_levels = vec!["trace", "debug", "info", "warn", "error"];
Expand Down
25 changes: 22 additions & 3 deletions library/streamhub/src/define.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct SubscriberInfo {
pub id: Uuid,
pub sub_type: SubscribeType,
pub notify_info: NotifyInfo,
pub sub_data_type: SubDataType,
}

impl Serialize for SubscriberInfo {
Expand All @@ -86,6 +87,7 @@ impl Serialize for SubscriberInfo {
pub struct PublisherInfo {
pub id: Uuid,
pub pub_type: PublishType,
pub pub_data_type: PubDataType,
pub notify_info: NotifyInfo,
}

Expand Down Expand Up @@ -165,7 +167,11 @@ pub type AvStatisticSender = mpsc::UnboundedSender<StreamStatistics>;
pub type AvStatisticReceiver = mpsc::UnboundedReceiver<StreamStatistics>;

pub type StreamStatisticSizeSender = oneshot::Sender<usize>;
pub type StreamStatisticSizeReceiver = oneshot::Sender<usize>;
pub type StreamStatisticSizeReceiver = oneshot::Receiver<usize>;

pub type SubEventExecuteResultSender = oneshot::Sender<Result<DataReceiver, ChannelError>>;
pub type PubEventExecuteResultSender =
oneshot::Sender<Result<(Option<FrameDataSender>, Option<PacketDataSender>), ChannelError>>;

#[async_trait]
pub trait TStreamHandler: Send + Sync {
Expand All @@ -190,14 +196,27 @@ pub enum DataSender {
Frame { sender: FrameDataSender },
Packet { sender: PacketDataSender },
}
//we can only sub one kind of stream.
#[derive(Debug, Clone, Serialize)]
pub enum SubDataType {
Frame,
Packet,
}
//we can pub frame or packet or both.
#[derive(Debug, Clone, Serialize)]
pub enum PubDataType {
Frame,
Packet,
Both,
}

#[derive(Serialize)]
pub enum StreamHubEvent {
Subscribe {
identifier: StreamIdentifier,
info: SubscriberInfo,
#[serde(skip_serializing)]
sender: DataSender,
result_sender: SubEventExecuteResultSender,
},
UnSubscribe {
identifier: StreamIdentifier,
Expand All @@ -207,7 +226,7 @@ pub enum StreamHubEvent {
identifier: StreamIdentifier,
info: PublisherInfo,
#[serde(skip_serializing)]
receiver: DataReceiver,
result_sender: PubEventExecuteResultSender,
#[serde(skip_serializing)]
stream_handler: Arc<dyn TStreamHandler>,
},
Expand Down
11 changes: 11 additions & 0 deletions library/streamhub/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytesio::bytes_errors::BytesReadError;
use bytesio::bytes_errors::BytesWriteError;
use failure::Backtrace;

use {failure::Fail, std::fmt};
#[derive(Debug, Fail)]
Expand Down Expand Up @@ -36,6 +37,16 @@ impl fmt::Display for ChannelError {
}
}

impl Fail for ChannelError {
fn cause(&self) -> Option<&dyn Fail> {
self.value.cause()
}

fn backtrace(&self) -> Option<&Backtrace> {
self.value.backtrace()
}
}

impl From<BytesReadError> for ChannelError {
fn from(error: BytesReadError) -> Self {
ChannelError {
Expand Down
97 changes: 88 additions & 9 deletions library/streamhub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,25 +337,71 @@ impl StreamsHub {
match message {
StreamHubEvent::Publish {
identifier,
receiver,
info,
result_sender,
stream_handler,
} => {
let rv = self
let (frame_sender, packet_sender, receiver) = match info.pub_data_type {
define::PubDataType::Frame => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
Some(sender_chan),
None,
DataReceiver {
frame_receiver: Some(receiver_chan),
packet_receiver: None,
},
)
}
define::PubDataType::Packet => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
None,
Some(sender_chan),
DataReceiver {
frame_receiver: None,
packet_receiver: Some(receiver_chan),
},
)
}
define::PubDataType::Both => {
let (sender_frame_chan, receiver_frame_chan) =
mpsc::unbounded_channel();
let (sender_packet_chan, receiver_packet_chan) =
mpsc::unbounded_channel();

(
Some(sender_frame_chan),
Some(sender_packet_chan),
DataReceiver {
frame_receiver: Some(receiver_frame_chan),
packet_receiver: Some(receiver_packet_chan),
},
)
}
};

let result = match self
.publish(identifier.clone(), receiver, stream_handler)
.await;
match rv {
.await
{
Ok(()) => {
if let Some(notifier) = &self.notifier {
notifier.on_publish_notify(event_serialize_str).await;
}
self.streams_info
.insert(info.id, PubSubInfo::Publish { identifier });

Ok((frame_sender, packet_sender))
}
Err(err) => {
log::error!("event_loop Publish err: {}", err);
continue;
Err(err)
}
};

if result_sender.send(result).is_err() {
log::error!("event_loop Subscribe error: The receiver dropped.")
}
}

Expand All @@ -378,12 +424,40 @@ impl StreamsHub {
StreamHubEvent::Subscribe {
identifier,
info,
sender,
result_sender,
} => {
let sub_id = info.id;
let info_clone = info.clone();
let rv = self.subscribe(&identifier, info_clone, sender).await;
match rv {

//new chan for Frame/Packet sender and receiver
let (sender, receiver) = match info.sub_data_type {
define::SubDataType::Frame => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
DataSender::Frame {
sender: sender_chan,
},
DataReceiver {
frame_receiver: Some(receiver_chan),
packet_receiver: None,
},
)
}
define::SubDataType::Packet => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
DataSender::Packet {
sender: sender_chan,
},
DataReceiver {
frame_receiver: None,
packet_receiver: Some(receiver_chan),
},
)
}
};

let rv = match self.subscribe(&identifier, info_clone, sender).await {
Ok(()) => {
if let Some(notifier) = &self.notifier {
notifier.on_play_notify(event_serialize_str).await;
Expand All @@ -396,11 +470,16 @@ impl StreamsHub {
sub_info: info,
},
);
Ok(receiver)
}
Err(err) => {
log::error!("event_loop Subscribe error: {}", err);
continue;
Err(err)
}
};

if result_sender.send(rv).is_err() {
log::error!("event_loop Subscribe error: The receiver dropped.")
}
}
StreamHubEvent::UnSubscribe { identifier, info } => {
Expand Down
2 changes: 1 addition & 1 deletion protocol/gb28181/src/gb28181.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mod tests {
use std::io::Read;
use std::net::UdpSocket;

use std::thread::{self, sleep};
use std::thread::{self};
use std::time::Duration;
#[test]
fn send_dump_file() {
Expand Down
32 changes: 27 additions & 5 deletions protocol/gb28181/src/session/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use {
failure::{Backtrace, Fail},
std::fmt,
std::str::Utf8Error,
streamhub::errors::ChannelError,
tokio::sync::oneshot::error::RecvError,
xmpegts::errors::MpegError,
};

Expand All @@ -14,16 +16,20 @@ pub struct SessionError {

#[derive(Debug, Fail)]
pub enum SessionErrorValue {
#[fail(display = "net io error: {}\n", _0)]
#[fail(display = "net io error: {}", _0)]
BytesIOError(#[cause] BytesIOError),
#[fail(display = "bytes read error: {}\n", _0)]
#[fail(display = "bytes read error: {}", _0)]
BytesReadError(#[cause] BytesReadError),
#[fail(display = "bytes write error: {}\n", _0)]
#[fail(display = "bytes write error: {}", _0)]
BytesWriteError(#[cause] BytesWriteError),
#[fail(display = "Utf8Error: {}\n", _0)]
#[fail(display = "Utf8Error: {}", _0)]
Utf8Error(#[cause] Utf8Error),
#[fail(display = "MpegError: {}\n", _0)]
#[fail(display = "MpegError: {}", _0)]
MpegError(#[cause] MpegError),
#[fail(display = "event execute error: {}", _0)]
ChannelError(#[cause] ChannelError),
#[fail(display = "tokio: oneshot receiver err: {}", _0)]
RecvError(#[cause] RecvError),
#[fail(display = "stream hub event send error\n")]
StreamHubEventSendErr,
#[fail(display = "cannot receive frame data from stream hub\n")]
Expand Down Expand Up @@ -70,6 +76,22 @@ impl From<Utf8Error> for SessionError {
}
}

impl From<ChannelError> for SessionError {
fn from(error: ChannelError) -> Self {
SessionError {
value: SessionErrorValue::ChannelError(error),
}
}
}

impl From<RecvError> for SessionError {
fn from(error: RecvError) -> Self {
SessionError {
value: SessionErrorValue::RecvError(error),
}
}
}

impl fmt::Display for SessionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.value, f)
Expand Down
Loading

0 comments on commit d1c63f7

Please sign in to comment.