Skip to content

Commit

Permalink
unlock SensorState while connecting
Browse files Browse the repository at this point in the history
  • Loading branch information
alsuren committed Sep 13, 2020
1 parent 6c6c0eb commit 812b91f
Showing 1 changed file with 53 additions and 39 deletions.
92 changes: 53 additions & 39 deletions publish-mqtt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(())
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum ConnectionStatus {
/// Not yet attempted to connect. Might already be connected from a previous
/// run of this program.
Expand All @@ -111,7 +111,7 @@ enum ConnectionStatus {
Connected,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct Sensor {
object_path: String,
mac_address: String,
Expand Down Expand Up @@ -241,15 +241,9 @@ async fn bluetooth_mainloop(
}

{
let state = &mut *state.lock().await;
connect_first_sensor_in_queue(
bt_session.clone(),
&mut state.homie,
&mut state.sensors_connected,
&mut state.sensors_to_connect,
)
.await
.with_context(|| std::line!().to_string())?;
action_next_sensor(state.clone(), bt_session.clone())
.await
.with_context(|| std::line!().to_string())?;
}

{
Expand Down Expand Up @@ -284,11 +278,12 @@ struct SensorState {
sensors_connected: Vec<Sensor>,
homie: HomieDevice,
}

async fn action_next_sensor(
state: Arc<Mutex<SensorState>>,
bt_session: MijiaSession,
) -> Result<(), anyhow::Error> {
let (idx, status) = match next_actionable_sensor(state).await {
let (idx, status) = match next_actionable_sensor(state.clone()).await {
Some(values) => values,
None => return Ok(()),
};
Expand All @@ -302,7 +297,7 @@ async fn action_next_sensor(
| ConnectionStatus::Disconnected
| ConnectionStatus::MarkedDisconnected
| ConnectionStatus::WatchdogTimeOut => {
// connect_sensor_idx(state, bt_session, idx)
connect_sensor_at_idx(state.clone(), bt_session, idx).await?;
Ok(())
}
ConnectionStatus::Connected => {
Expand All @@ -311,6 +306,7 @@ async fn action_next_sensor(
}
}
}

async fn next_actionable_sensor(
state: Arc<Mutex<SensorState>>,
) -> Option<(usize, ConnectionStatus)> {
Expand All @@ -330,6 +326,22 @@ async fn next_actionable_sensor(
}
}

async fn clone_sensor_at_idx(state: Arc<Mutex<SensorState>>, idx: usize) -> Sensor {
state.lock().await.sensors[idx].clone()
}

async fn update_sensor_at_idx(
state: Arc<Mutex<SensorState>>,
idx: usize,
sensor: Sensor,
) -> Result<(), anyhow::Error> {
let mut state = state.lock().await;
// FIXME: find a way to assert that nobody else has modified sensor in the meantime.
// Maybe give it version number and increment it whenever it is set?
state.sensors[idx] = sensor;
Ok(())
}

async fn check_for_sensors(
state: Arc<Mutex<SensorState>>,
bt_session: MijiaSession,
Expand Down Expand Up @@ -360,33 +372,43 @@ async fn check_for_sensors(
Ok(())
}

async fn connect_first_sensor_in_queue(
async fn connect_sensor_at_idx(
state: Arc<Mutex<SensorState>>,
bt_session: MijiaSession,
homie: &mut HomieDevice,
sensors_connected: &mut Vec<Sensor>,
sensors_to_connect: &mut VecDeque<Sensor>,
idx: usize,
) -> Result<(), anyhow::Error> {
println!("{} sensors in queue to connect.", sensors_to_connect.len());
{
state.lock().await.sensors[idx].connection_status = ConnectionStatus::Connecting {
reserved_until: Instant::now() + Duration::from_secs(5 * 60),
}
}
// Try to connect to a sensor.
if let Some(mut sensor) = sensors_to_connect.pop_front() {
println!("Trying to connect to {}", sensor.name);
match connect_start_sensor(bt_session.clone(), homie, &mut sensor).await {
Err(e) => {
println!("Failed to connect to {}: {:?}", sensor.name, e);
sensors_to_connect.push_back(sensor);
}
Ok(()) => {
println!("Connected to {} and started notifications", sensor.name);
sensors_connected.push(sensor);
}
let mut sensor = clone_sensor_at_idx(state.clone(), idx).await;
println!("Trying to connect to {}", sensor.name);
match connect_start_sensor(bt_session.clone(), &mut sensor).await {
Err(e) => {
println!("Failed to connect to {}: {:?}", sensor.name, e);
}
Ok(()) => {
println!("Connected to {} and started notifications", sensor.name);
state
.lock()
.await
.homie
.add_node(sensor.as_node())
.await
.with_context(|| std::line!().to_string())?;
sensor.connection_status = ConnectionStatus::Connected;
sensor.last_update_timestamp = Instant::now();
}
}
update_sensor_at_idx(state, idx, sensor).await?;

Ok(())
}

async fn connect_start_sensor<'a>(
bt_session: MijiaSession,
homie: &mut HomieDevice,
sensor: &mut Sensor,
) -> Result<(), anyhow::Error> {
println!("Connecting from status: {:?}", sensor.connection_status);
Expand All @@ -395,15 +417,7 @@ async fn connect_start_sensor<'a>(
.await
.with_context(|| std::line!().to_string())?;
match start_notify_sensor(bt_session.clone(), &sensor.object_path).await {
Ok(()) => {
homie
.add_node(sensor.as_node())
.await
.with_context(|| std::line!().to_string())?;
sensor.connection_status = ConnectionStatus::Connected;
sensor.last_update_timestamp = Instant::now();
Ok(())
}
Ok(()) => Ok(()),
Err(e) => {
// If starting notifications failed, disconnect so that we start again from a clean
// state next time.
Expand Down

0 comments on commit 812b91f

Please sign in to comment.