Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unlock SensorState while connecting #32

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 110 additions & 44 deletions publish-mqtt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(())
}

#[derive(Debug)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum ConnectionStatus {
/// Not yet attempted to connect. Might already be connected from a previous
/// run of this program.
Unknown,
/// Currently connecting. Don't try again until the timeout expires.
Connecting { reserved_until: Instant },
/// Connected, but could not subscribe to updates. GATT characteristics
/// sometimes take a while to show up after connecting, so this retry is
/// a bit of a work-around.
Expand All @@ -108,7 +110,7 @@ enum ConnectionStatus {
Connected,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct Sensor {
object_path: String,
mac_address: String,
Expand Down Expand Up @@ -207,13 +209,6 @@ impl Sensor {
}
}

#[derive(Debug)]
struct SensorState {
sensors_to_connect: VecDeque<Sensor>,
sensors_connected: Vec<Sensor>,
homie: HomieDevice,
}

async fn run_sensor_system(
mut homie: HomieDevice,
bt_session: &MijiaSession,
Expand All @@ -226,6 +221,8 @@ async fn run_sensor_system(
.with_context(|| std::line!().to_string())?;

let state = Arc::new(Mutex::new(SensorState {
sensors: vec![],
next_idx: 0,
sensors_to_connect: VecDeque::new(),
sensors_connected: vec![],
homie,
Expand Down Expand Up @@ -253,15 +250,9 @@ async fn bluetooth_connection_loop(
}

{
let state = &mut *state.lock().await;
connect_first_sensor_in_queue(
bt_session,
&mut state.homie,
&mut state.sensors_connected,
&mut state.sensors_to_connect,
)
.await
.with_context(|| std::line!().to_string())?;
action_next_sensor(state.clone(), bt_session.clone())
.await
.with_context(|| std::line!().to_string())?;
}

{
Expand All @@ -278,6 +269,78 @@ async fn bluetooth_connection_loop(
}
}

#[derive(Debug)]
struct SensorState {
sensors: Vec<Sensor>,
next_idx: usize,
sensors_to_connect: VecDeque<Sensor>,
sensors_connected: Vec<Sensor>,
homie: HomieDevice,
}

async fn action_next_sensor(
state: Arc<Mutex<SensorState>>,
bt_session: MijiaSession,
) -> Result<(), anyhow::Error> {
let (idx, status) = match next_actionable_sensor(state.clone()).await {
Some(values) => values,
None => return Ok(()),
};
match status {
ConnectionStatus::Connecting { reserved_until } if reserved_until > Instant::now() => {
Ok(())
}
ConnectionStatus::Unknown
| ConnectionStatus::Connecting { .. }
| ConnectionStatus::SubscribingFailedOnce
| ConnectionStatus::Disconnected
| ConnectionStatus::MarkedDisconnected
| ConnectionStatus::WatchdogTimeOut => {
connect_sensor_at_idx(state.clone(), bt_session, idx).await?;
Ok(())
}
ConnectionStatus::Connected => {
// check_for_stale_sensor(state, bt_session, idx)
Ok(())
}
}
}

async fn next_actionable_sensor(
state: Arc<Mutex<SensorState>>,
) -> Option<(usize, ConnectionStatus)> {
let mut state = state.lock().await;
let idx = state.next_idx;
let status = state.sensors.get(idx).map(|s| s.connection_status);

match status {
None => {
state.next_idx = 0;
None
}
Some(status) => {
state.next_idx += 1;
Some((idx, status))
}
}
}

async fn clone_sensor_at_idx(state: Arc<Mutex<SensorState>>, idx: usize) -> Sensor {
state.lock().await.sensors[idx].clone()
}

async fn update_sensor_at_idx(
state: Arc<Mutex<SensorState>>,
idx: usize,
sensor: Sensor,
) -> Result<(), anyhow::Error> {
let mut state = state.lock().await;
// FIXME: find a way to assert that nobody else has modified sensor in the meantime.
// Maybe give it version number and increment it whenever it is set?
state.sensors[idx] = sensor;
Ok(())
}

async fn check_for_sensors(
state: Arc<Mutex<SensorState>>,
bt_session: &MijiaSession,
Expand Down Expand Up @@ -306,33 +369,43 @@ async fn check_for_sensors(
Ok(())
}

async fn connect_first_sensor_in_queue(
async fn connect_sensor_at_idx(
state: Arc<Mutex<SensorState>>,
bt_session: &MijiaSession,
homie: &mut HomieDevice,
sensors_connected: &mut Vec<Sensor>,
sensors_to_connect: &mut VecDeque<Sensor>,
idx: usize,
) -> Result<(), anyhow::Error> {
println!("{} sensors in queue to connect.", sensors_to_connect.len());
{
state.lock().await.sensors[idx].connection_status = ConnectionStatus::Connecting {
reserved_until: Instant::now() + Duration::from_secs(5 * 60),
}
}
// Try to connect to a sensor.
if let Some(mut sensor) = sensors_to_connect.pop_front() {
println!("Trying to connect to {}", sensor.name);
match connect_start_sensor(bt_session, homie, &mut sensor).await {
Err(e) => {
println!("Failed to connect to {}: {:?}", sensor.name, e);
sensors_to_connect.push_back(sensor);
}
Ok(()) => {
println!("Connected to {} and started notifications", sensor.name);
sensors_connected.push(sensor);
}
let mut sensor = clone_sensor_at_idx(state.clone(), idx).await;
println!("Trying to connect to {}", sensor.name);
match connect_start_sensor(bt_session, &mut sensor).await {
Err(e) => {
println!("Failed to connect to {}: {:?}", sensor.name, e);
}
Ok(()) => {
println!("Connected to {} and started notifications", sensor.name);
state
.lock()
.await
.homie
.add_node(sensor.as_node())
.await
.with_context(|| std::line!().to_string())?;
sensor.connection_status = ConnectionStatus::Connected;
sensor.last_update_timestamp = Instant::now();
}
}
update_sensor_at_idx(state, idx, sensor).await?;

Ok(())
}

async fn connect_start_sensor<'a>(
bt_session: &MijiaSession,
homie: &mut HomieDevice,
sensor: &mut Sensor,
) -> Result<(), anyhow::Error> {
println!("Connecting from status: {:?}", sensor.connection_status);
Expand All @@ -341,20 +414,13 @@ async fn connect_start_sensor<'a>(
.await
.with_context(|| std::line!().to_string())?;
match start_notify_sensor(bt_session, &sensor.object_path).await {
Ok(()) => {
homie
.add_node(sensor.as_node())
.await
.with_context(|| std::line!().to_string())?;
sensor.connection_status = ConnectionStatus::Connected;
sensor.last_update_timestamp = Instant::now();
Ok(())
}
Ok(()) => Ok(()),
Err(e) => {
// If starting notifications failed a second time, disconnect so
// that we start again from a clean state next time.
match sensor.connection_status {
ConnectionStatus::Unknown
| ConnectionStatus::Connecting { .. }
| ConnectionStatus::Disconnected
| ConnectionStatus::MarkedDisconnected
| ConnectionStatus::WatchdogTimeOut => {
Expand Down