Skip to content

Commit

Permalink
Add seek and implement Seeker on File (#1797)
Browse files Browse the repository at this point in the history
This patch contains a special hack that circumvents the current tokio
seek problem.

tokio `seek` is implemented to take ownership of the original File and
emit a new one in its future, which conflicts with the design of
ResourceTable.

To avoid the problem, the current hack makes the FsFile resource
an Option which we could `take` the value ownership out of it. We then
convert the tokio File into a Rust std File, perform the seek, and then
put it back into the resource.

This might be able to drop this hack after
tokio-rs/tokio#785 lands.
  • Loading branch information
kevinkassimo authored and ry committed Feb 18, 2019
1 parent 97e29e3 commit 077af20
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 13 deletions.
2 changes: 2 additions & 0 deletions js/deno.ts
Expand Up @@ -11,13 +11,15 @@ export {
stderr,
read,
write,
seek,
close,
OpenMode
} from "./files";
export {
copy,
toAsyncIterator,
ReadResult,
SeekMode,
Reader,
Writer,
Closer,
Expand Down
25 changes: 23 additions & 2 deletions js/files.ts
@@ -1,12 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { Reader, Writer, Closer, ReadResult } from "./io";
import { Reader, Writer, Seeker, Closer, ReadResult, SeekMode } from "./io";
import * as dispatch from "./dispatch";
import * as msg from "gen/msg_generated";
import { assert } from "./util";
import * as flatbuffers from "./flatbuffers";

/** The Deno abstraction for reading and writing files. */
export class File implements Reader, Writer, Closer {
export class File implements Reader, Writer, Seeker, Closer {
constructor(readonly rid: number) {}

write(p: Uint8Array): Promise<number> {
Expand All @@ -17,6 +17,10 @@ export class File implements Reader, Writer, Closer {
return read(this.rid, p);
}

seek(offset: number, whence: SeekMode): Promise<void> {
return seek(this.rid, offset, whence);
}

close(): void {
close(this.rid);
}
Expand Down Expand Up @@ -123,6 +127,23 @@ export async function write(rid: number, p: Uint8Array): Promise<number> {
return res.nbyte();
}

/** Seek a file ID to the given offset under mode given by `whence`.
*
*/
export async function seek(
rid: number,
offset: number,
whence: SeekMode
): Promise<void> {
const builder = flatbuffers.createBuilder();
msg.Seek.startSeek(builder);
msg.Seek.addRid(builder, rid);
msg.Seek.addOffset(builder, offset);
msg.Seek.addWhence(builder, whence);
const inner = msg.Seek.endSeek(builder);
await dispatch.sendAsync(builder, msg.Any.Seek, inner);
}

/** Close the file ID. */
export function close(rid: number): void {
const builder = flatbuffers.createBuilder();
Expand Down
64 changes: 55 additions & 9 deletions js/files_test.ts
Expand Up @@ -141,15 +141,61 @@ testPerm({ read: true, write: true }, async function openModeWriteRead() {
fileInfo = Deno.statSync(filename);
assertEqual(fileInfo.len, 13);

// TODO: this test is not working, I expect because
// file handle points to the end of file, but ATM
// deno has no seek implementation on Rust side
// assert file can be read
// const buf = new Uint8Array(20);
// const result = await file.read(buf);
// console.log(result.eof, result.nread);
// assertEqual(result.nread, 13);
// file.close();
const buf = new Uint8Array(20);
await file.seek(0, Deno.SeekMode.SEEK_START);
const result = await file.read(buf);
assertEqual(result.nread, 13);
file.close();

await Deno.remove(tempDir, { recursive: true });
});

testPerm({ read: true }, async function seekStart() {
const filename = "tests/hello.txt";
const file = await Deno.open(filename);
// Deliberately move 1 step forward
await file.read(new Uint8Array(1)); // "H"
// Skipping "Hello "
await file.seek(6, Deno.SeekMode.SEEK_START);
const buf = new Uint8Array(6);
await file.read(buf);
const decoded = new TextDecoder().decode(buf);
assertEqual(decoded, "world!");
});

testPerm({ read: true }, async function seekCurrent() {
const filename = "tests/hello.txt";
const file = await Deno.open(filename);
// Deliberately move 1 step forward
await file.read(new Uint8Array(1)); // "H"
// Skipping "ello "
await file.seek(5, Deno.SeekMode.SEEK_CURRENT);
const buf = new Uint8Array(6);
await file.read(buf);
const decoded = new TextDecoder().decode(buf);
assertEqual(decoded, "world!");
});

testPerm({ read: true }, async function seekEnd() {
const filename = "tests/hello.txt";
const file = await Deno.open(filename);
await file.seek(-6, Deno.SeekMode.SEEK_END);
const buf = new Uint8Array(6);
await file.read(buf);
const decoded = new TextDecoder().decode(buf);
assertEqual(decoded, "world!");
});

testPerm({ read: true }, async function seekMode() {
const filename = "tests/hello.txt";
const file = await Deno.open(filename);
let err;
try {
await file.seek(1, -1);
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, Deno.ErrorKind.InvalidSeekMode);
assertEqual(err.name, "InvalidSeekMode");
});
10 changes: 9 additions & 1 deletion js/io.ts
Expand Up @@ -9,6 +9,14 @@ export interface ReadResult {
eof: boolean;
}

// Seek whence values.
// https://golang.org/pkg/io/#pkg-constants
export enum SeekMode {
SEEK_START = 0,
SEEK_CURRENT = 1,
SEEK_END = 2
}

// Reader is the interface that wraps the basic read() method.
// https://golang.org/pkg/io/#Reader
export interface Reader {
Expand Down Expand Up @@ -74,7 +82,7 @@ export interface Seeker {
* any positive offset is legal, but the behavior of subsequent I/O operations
* on the underlying object is implementation-dependent.
*/
seek(offset: number, whence: number): Promise<void>;
seek(offset: number, whence: SeekMode): Promise<void>;
}

// https://golang.org/pkg/io/#ReadCloser
Expand Down
8 changes: 8 additions & 0 deletions src/msg.fbs
Expand Up @@ -65,6 +65,7 @@ union Any {
NowRes,
IsTTY,
IsTTYRes,
Seek,
}

enum ErrorKind: byte {
Expand Down Expand Up @@ -117,6 +118,7 @@ enum ErrorKind: byte {

// custom errors
InvalidUri,
InvalidSeekMode,
}

table Cwd {}
Expand Down Expand Up @@ -496,4 +498,10 @@ table IsTTYRes {
stderr: bool;
}

table Seek {
rid: uint32;
offset: int;
whence: uint;
}

root_type Base;
23 changes: 23 additions & 0 deletions src/ops.rs
Expand Up @@ -126,6 +126,7 @@ pub fn dispatch(
msg::Any::WriteFile => op_write_file,
msg::Any::Now => op_now,
msg::Any::IsTTY => op_is_tty,
msg::Any::Seek => op_seek,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(inner_type)
Expand Down Expand Up @@ -868,6 +869,28 @@ fn op_write(
}
}

fn op_seek(
_state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: libdeno::deno_buf,
) -> Box<Op> {
assert_eq!(data.len(), 0);
let _cmd_id = base.cmd_id();
let inner = base.inner_as_seek().unwrap();
let rid = inner.rid();
let offset = inner.offset();
let whence = inner.whence();

match resources::lookup(rid) {
None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = resources::seek(resource, offset, whence)
.and_then(move |_| Ok(empty_buf()));
Box::new(op)
}
}
}

fn op_remove(
state: &Arc<IsolateState>,
base: &msg::Base<'_>,
Expand Down
61 changes: 60 additions & 1 deletion src/resources.rs
Expand Up @@ -30,7 +30,7 @@ use futures::Stream;
use hyper;
use std;
use std::collections::HashMap;
use std::io::{Error, Read, Write};
use std::io::{Error, Read, Seek, SeekFrom, Write};
use std::net::{Shutdown, SocketAddr};
use std::process::ExitStatus;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -565,3 +565,62 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
},
}
}

// TODO(kevinkassimo): revamp this after the following lands:
// https://github.com/tokio-rs/tokio/pull/785
pub fn seek(
resource: Resource,
offset: i32,
whence: u32,
) -> Box<dyn Future<Item = (), Error = DenoError> + Send> {
let mut table = RESOURCE_TABLE.lock().unwrap();
// We take ownership of File here.
// It is put back below while still holding the lock.
let maybe_repr = table.remove(&resource.rid);
match maybe_repr {
None => panic!("bad rid"),
Some(Repr::FsFile(f)) => {
let seek_from = match whence {
0 => SeekFrom::Start(offset as u64),
1 => SeekFrom::Current(offset as i64),
2 => SeekFrom::End(offset as i64),
_ => {
return Box::new(futures::future::err(errors::new(
errors::ErrorKind::InvalidSeekMode,
format!("Invalid seek mode: {}", whence),
)));
}
};
// Trait Clone not implemented on tokio::fs::File,
// so convert to std File first.
let std_file = f.into_std();
// Create a copy and immediately put back.
// We don't want to block other resource ops.
// try_clone() would yield a copy containing the same
// underlying fd, so operations on the copy would also
// affect the one in resource table, and we don't need
// to write back.
let maybe_std_file_copy = std_file.try_clone();
// Insert the entry back with the same rid.
table.insert(
resource.rid,
Repr::FsFile(tokio_fs::File::from_std(std_file)),
);
if maybe_std_file_copy.is_err() {
return Box::new(futures::future::err(DenoError::from(
maybe_std_file_copy.unwrap_err(),
)));
}
let mut std_file_copy = maybe_std_file_copy.unwrap();
return Box::new(futures::future::lazy(move || {
let result = std_file_copy
.seek(seek_from)
.map(|_| {
return ();
}).map_err(DenoError::from);
futures::future::result(result)
}));
}
_ => panic!("cannot seek"),
}
}

0 comments on commit 077af20

Please sign in to comment.