Skip to content

Commit

Permalink
Temporary removal of reconnect logic (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Isaac Davis committed Dec 18, 2019
1 parent 6b1cf4d commit 1353a0b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 74 deletions.
7 changes: 5 additions & 2 deletions src/proto/active_packetizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ where
self.first = false;

// find the waiting request future
let (opcode, tx) = self.reply.remove(&xid).unwrap(); // TODO: return an error if xid was unknown
let (opcode, tx) = match self.reply.remove(&xid) {
Some(tuple) => tuple,
None => bail!("No waiting request future found for xid {:?}", xid)
};

if let Some(w) = self.pending_watchers.remove(&xid) {
// normally, watches are *only* added for successful operations
Expand Down Expand Up @@ -395,7 +398,7 @@ where
self.timer.reset(time::Instant::now() + self.timeout);
}

trace!(logger, "poll_read");
trace!(logger, "poll_write");
let w = self.poll_write(exiting, logger)?;

match (r, w) {
Expand Down
74 changes: 2 additions & 72 deletions src/proto/packetizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,77 +201,7 @@ where
}
}

match self
.state
.poll(self.exiting, &mut self.logger, &mut self.default_watcher)
{
Ok(v) => Ok(v),
Err(e) => {
// if e is disconnect, then purge state and reconnect
// for now, assume all errors are disconnects
// TODO: test this!

let password = if let PacketizerState::Connected(ActivePacketizer {
ref mut password,
..
}) = self.state
{
password.split_off(0)
} else {
// XXX: error while connecting -- don't recurse (for now)
return Err(e);
};

if let PacketizerState::Connected(ActivePacketizer {
last_zxid_seen,
session_id,
..
}) = self.state
{
info!(self.logger, "connection lost; reconnecting";
"session_id" => session_id,
"last_zxid" => last_zxid_seen
);

let xid = self.xid;
self.xid += 1;

let log = self.logger.clone();
let retry = S::connect(&self.addr)
.map_err(|e| e.into())
.map(move |stream| {
let request = Request::Connect {
protocol_version: 0,
last_zxid_seen,
timeout: 0,
session_id,
passwd: password,
read_only: false,
};
trace!(log, "about to handshake (again)");

let (tx, rx) = oneshot::channel();
tokio::spawn(rx.then(move |r| {
trace!(log, "re-connection response: {:?}", r);
Ok(())
}));

let mut ap = ActivePacketizer::new(stream);
ap.enqueue(xid, request, tx);
ap
});

// dropping the old state will also cancel in-flight requests
mem::replace(
&mut self.state,
PacketizerState::Reconnecting(Box::new(retry)),
);
self.poll()
} else {
unreachable!();
}
}
}
self.state.poll(self.exiting, &mut self.logger, &mut self.default_watcher)
}
}

Expand All @@ -288,7 +218,7 @@ impl Enqueuer {
let (tx, rx) = oneshot::channel();
match self.0.unbounded_send((request, tx)) {
Ok(()) => {
Either::A(rx.map_err(|e| format_err!("failed to enqueue new request: {:?}", e)))
Either::A(rx.map_err(|e| format_err!("Error processing request: {:?}", e)))
}
Err(e) => {
Either::B(Err(format_err!("failed to enqueue new request: {:?}", e)).into_future())
Expand Down

0 comments on commit 1353a0b

Please sign in to comment.