Skip to content

Commit

Permalink
Revert "recover: #1517 Kill all pending accepts when TCP listener is …
Browse files Browse the repository at this point in the history
…closed (#2224)" (#2239)

Crashes while running wrk against
js/deps/https/deno.land/std/http/http_bench.ts

This reverts commit 972ac03.
  • Loading branch information
ry committed Apr 28, 2019
1 parent a4551c8 commit 1af02b4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 110 deletions.
37 changes: 0 additions & 37 deletions cli/resources.rs
Expand Up @@ -171,49 +171,12 @@ impl Resource {
}
}

/// Track the current task (for TcpListener resource).
/// Throws an error if another task is already tracked.
pub fn track_task(&mut self) -> Result<(), std::io::Error> {
let mut table = RESOURCE_TABLE.lock().unwrap();
// Only track if is TcpListener.
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
if t.is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Another accept task is ongoing",
));
}
t.replace(futures::task::current());
}
Ok(())
}

/// Stop tracking a task (for TcpListener resource).
/// Happens when the task is done and thus no further tracking is needed.
pub fn untrack_task(&mut self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
// Only untrack if is TcpListener.
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
assert!(t.is_some());
t.take();
}
}

// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
pub fn close(&self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&self.rid);
assert!(r.is_some());
// If TcpListener, we must kill all pending accepts!
if let Repr::TcpListener(_, Some(t)) = r.unwrap() {
// Call notify on the tracked task, so that they would error out.
t.notify();
}
}

pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
Expand Down
21 changes: 2 additions & 19 deletions cli/tokio_util.rs
Expand Up @@ -78,31 +78,14 @@ pub fn accept(r: Resource) -> Accept {
pub struct Accept {
state: AcceptState,
}

impl Future for Accept {
type Item = (TcpStream, SocketAddr);
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, addr) = match self.state {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
AcceptState::Pending(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => {
r.untrack_task();
t
}
Ok(futures::prelude::Async::NotReady) => {
// Would error out if another accept task is being tracked.
r.track_task()?;
return Ok(futures::prelude::Async::NotReady);
}
Err(e) => {
r.untrack_task();
return Err(From::from(e));
}
},
AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
AcceptState::Empty => panic!("poll Accept after it's done"),
};

Expand Down
92 changes: 38 additions & 54 deletions js/net_test.ts
@@ -1,22 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { testPerm, assert, assertEquals } from "./test_util.ts";

function deferred(): {
resolve: () => void;
reject: () => void;
promise: Promise<void>;
} {
let resolve: () => void;
let reject: () => void;
const promise = new Promise<void>(
(a, b): void => {
resolve = a;
reject = b;
}
);
return { resolve, reject, promise };
}

testPerm({ net: true }, function netListenClose(): void {
const listener = Deno.listen("tcp", "127.0.0.1:4500");
listener.close();
Expand Down Expand Up @@ -88,25 +72,24 @@ testPerm({ net: true }, async function netDialListen(): Promise<void> {
conn.close();
});

testPerm({ net: true }, async function netCloseReadSuccess(): Promise<void> {
/* TODO Fix broken test.
testPerm({ net: true }, async function netCloseReadSuccess() {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
const closeReadDeferred = deferred();
listener.accept().then(
async (conn): Promise<void> => {
await closeReadDeferred.promise;
await conn.write(new Uint8Array([1, 2, 3]));
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEquals(3, readResult.nread);
assertEquals(4, buf[0]);
assertEquals(5, buf[1]);
assertEquals(6, buf[2]);
conn.close();
closeDeferred.resolve();
}
);
listener.accept().then(async conn => {
await closeReadDeferred.promise;
await conn.write(new Uint8Array([1, 2, 3]));
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEquals(3, readResult.nread);
assertEquals(4, buf[0]);
assertEquals(5, buf[1]);
assertEquals(6, buf[2]);
conn.close();
closeDeferred.resolve();
});
const conn = await Deno.dial("tcp", addr);
conn.closeRead(); // closing read
closeReadDeferred.resolve();
Expand All @@ -120,18 +103,18 @@ testPerm({ net: true }, async function netCloseReadSuccess(): Promise<void> {
listener.close();
conn.close();
});
*/

testPerm({ net: true }, async function netDoubleCloseRead(): Promise<void> {
/* TODO Fix broken test.
testPerm({ net: true }, async function netDoubleCloseRead() {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(
async (conn): Promise<void> => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
}
);
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await Deno.dial("tcp", addr);
conn.closeRead(); // closing read
let err;
Expand All @@ -148,18 +131,18 @@ testPerm({ net: true }, async function netDoubleCloseRead(): Promise<void> {
listener.close();
conn.close();
});
*/

testPerm({ net: true }, async function netCloseWriteSuccess(): Promise<void> {
/* TODO Fix broken test.
testPerm({ net: true }, async function netCloseWriteSuccess() {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(
async (conn): Promise<void> => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
}
);
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await Deno.dial("tcp", addr);
conn.closeWrite(); // closing write
const buf = new Uint8Array(1024);
Expand All @@ -183,17 +166,17 @@ testPerm({ net: true }, async function netCloseWriteSuccess(): Promise<void> {
listener.close();
conn.close();
});
*/

testPerm({ net: true }, async function netDoubleCloseWrite(): Promise<void> {
/* TODO Fix broken test.
testPerm({ net: true }, async function netDoubleCloseWrite() {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(
async (conn): Promise<void> => {
await closeDeferred.promise;
conn.close();
}
);
listener.accept().then(async conn => {
await closeDeferred.promise;
conn.close();
});
const conn = await Deno.dial("tcp", addr);
conn.closeWrite(); // closing write
let err;
Expand All @@ -210,3 +193,4 @@ testPerm({ net: true }, async function netDoubleCloseWrite(): Promise<void> {
listener.close();
conn.close();
});
*/

0 comments on commit 1af02b4

Please sign in to comment.