Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a StreamWithState streamer #61

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/map.rs
Expand Up @@ -632,6 +632,22 @@ impl<'m, A: Automaton> Stream<'m, A> {
}
}

/// A lexicographically ordered stream of key-value from a map
/// along with the states of the automaton.
///
/// The `Stream` type is based on the `StreamWithState`.
pub struct StreamWithState<'m, A=AlwaysMatch>(raw::StreamWithState<'m, A>) where A: Automaton;

impl<'a, 'm, A: 'a + Automaton> Streamer<'a> for StreamWithState<'m, A>
where A::State: Clone
{
type Item = (&'a [u8], u64, A::State);

fn next(&'a mut self) -> Option<Self::Item> {
self.0.next().map(|(key, out, state)| (key, out.value(), state))
}
}

/// A lexicographically ordered stream of keys from a map.
///
/// The `'m` lifetime parameter refers to the lifetime of the underlying map.
Expand Down Expand Up @@ -693,6 +709,12 @@ impl<'m, A: Automaton> StreamBuilder<'m, A> {
pub fn lt<T: AsRef<[u8]>>(self, bound: T) -> Self {
StreamBuilder(self.0.lt(bound))
}

/// Return this builder and gives the automaton states
/// along with the results.
pub fn with_state(self) -> StreamWithStateBuilder<'m, A> {
StreamWithStateBuilder(self.0.with_state())
}
}

impl<'m, 'a, A: Automaton> IntoStreamer<'a> for StreamBuilder<'m, A> {
Expand All @@ -704,6 +726,32 @@ impl<'m, 'a, A: Automaton> IntoStreamer<'a> for StreamBuilder<'m, A> {
}
}

/// A builder for constructing range queries of streams
/// that returns results along with automaton states.
///
/// Once all bounds are set, one should call `into_stream` to get a
/// `StreamWithState`.
///
/// Bounds are not additive. That is, if `ge` is called twice on the same
/// builder, then the second setting wins.
///
/// The `A` type parameter corresponds to an optional automaton to filter
/// the stream. By default, no filtering is done.
///
/// The `'m` lifetime parameter refers to the lifetime of the underlying map.
pub struct StreamWithStateBuilder<'m, A=AlwaysMatch>(raw::StreamWithStateBuilder<'m, A>);

impl<'m, 'a, A: 'a + Automaton> IntoStreamer<'a> for StreamWithStateBuilder<'m, A>
where A::State: Clone
{
type Item = (&'a [u8], u64, A::State);
type Into = StreamWithState<'m, A>;

fn into_stream(self) -> Self::Into {
StreamWithState(self.0.into_stream())
}
}

/// A builder for collecting map streams on which to perform set operations
/// on the keys of maps.
///
Expand Down
230 changes: 157 additions & 73 deletions src/raw/mod.rs
Expand Up @@ -624,6 +624,17 @@ impl<'f, A: Automaton> StreamBuilder<'f, A> {
self.max = Bound::Excluded(bound.as_ref().to_owned());
self
}

/// Return this builder and gives the automaton states
/// along with the results.
pub fn with_state(self) -> StreamWithStateBuilder<'f, A> {
StreamWithStateBuilder {
fst: self.fst,
aut: self.aut,
min: self.min,
max: self.max,
}
}
}

impl<'a, 'f, A: Automaton> IntoStreamer<'a> for StreamBuilder<'f, A> {
Expand All @@ -635,6 +646,37 @@ impl<'a, 'f, A: Automaton> IntoStreamer<'a> for StreamBuilder<'f, A> {
}
}

/// A builder for constructing range queries of streams
/// that returns results along with automaton states.
///
/// Once all bounds are set, one should call `into_stream` to get a
/// `StreamWithState`.
///
/// Bounds are not additive. That is, if `ge` is called twice on the same
/// builder, then the second setting wins.
///
/// The `A` type parameter corresponds to an optional automaton to filter
/// the stream. By default, no filtering is done.
///
/// The `'f` lifetime parameter refers to the lifetime of the underlying fst.
pub struct StreamWithStateBuilder<'f, A=AlwaysMatch> {
fst: &'f Fst,
aut: A,
min: Bound,
max: Bound,
}

impl<'a, 'f, A: 'a + Automaton> IntoStreamer<'a> for StreamWithStateBuilder<'f, A>
where A::State: Clone
{
type Item = (&'a [u8], Output, A::State);
type Into = StreamWithState<'f, A>;

fn into_stream(self) -> StreamWithState<'f, A> {
StreamWithState::new(self.fst, self.aut, self.min, self.max)
}
}

#[derive(Debug)]
enum Bound {
Included(Vec<u8>),
Expand Down Expand Up @@ -673,7 +715,91 @@ impl Bound {
/// the stream. By default, no filtering is done.
///
/// The `'f` lifetime parameter refers to the lifetime of the underlying fst.
pub struct Stream<'f, A=AlwaysMatch> where A: Automaton {
pub struct Stream<'f, A=AlwaysMatch>(StreamWithState<'f, A>) where A: Automaton;

impl<'f, A: Automaton> Stream<'f, A> {
fn new(fst: &'f Fst, aut: A, min: Bound, max: Bound) -> Self {
Stream(StreamWithState::new(fst, aut, min, max))
}

/// Convert this stream into a vector of byte strings and outputs.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_byte_vec(mut self) -> Vec<(Vec<u8>, u64)> {
let mut vs = vec![];
while let Some((k, v)) = self.next() {
vs.push((k.to_vec(), v.value()));
}
vs
}

/// Convert this stream into a vector of Unicode strings and outputs.
///
/// If any key is not valid UTF-8, then iteration on the stream is stopped
/// and a UTF-8 decoding error is returned.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_str_vec(mut self) -> Result<Vec<(String, u64)>> {
let mut vs = vec![];
while let Some((k, v)) = self.next() {
let k = String::from_utf8(k.to_vec()).map_err(Error::from)?;
vs.push((k, v.value()));
}
Ok(vs)
}

/// Convert this stream into a vector of byte strings.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_byte_keys(mut self) -> Vec<Vec<u8>> {
let mut vs = vec![];
while let Some((k, _)) = self.next() {
vs.push(k.to_vec());
}
vs
}

/// Convert this stream into a vector of Unicode strings.
///
/// If any key is not valid UTF-8, then iteration on the stream is stopped
/// and a UTF-8 decoding error is returned.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_str_keys(mut self) -> Result<Vec<String>> {
let mut vs = vec![];
while let Some((k, _)) = self.next() {
let k = String::from_utf8(k.to_vec()).map_err(Error::from)?;
vs.push(k);
}
Ok(vs)
}

/// Convert this stream into a vector of outputs.
pub fn into_values(mut self) -> Vec<u64> {
let mut vs = vec![];
while let Some((_, v)) = self.next() {
vs.push(v.value());
}
vs
}
}

impl<'f, 'a, A: Automaton> Streamer<'a> for Stream<'f, A> {
type Item = (&'a [u8], Output);

fn next(&'a mut self) -> Option<Self::Item> {
self.0.next(|_| ()).map(|(key, out, _)| (key, out))
}
}

/// A lexicographically ordered stream from an fst
/// of key-value pairs along with the state of the automaton.
///
/// The `A` type parameter corresponds to an optional automaton to filter
/// the stream. By default, no filtering is done.
///
/// The `'f` lifetime parameter refers to the lifetime of the underlying fst.
pub struct StreamWithState<'f, A=AlwaysMatch> where A: Automaton {
fst: &'f Fst,
aut: A,
inp: Vec<u8>,
Expand All @@ -690,9 +816,9 @@ struct StreamState<'f, S> {
aut_state: S,
}

impl<'f, A: Automaton> Stream<'f, A> {
impl<'f, A: Automaton> StreamWithState<'f, A> {
fn new(fst: &'f Fst, aut: A, min: Bound, max: Bound) -> Self {
let mut rdr = Stream {
let mut rdr = StreamWithState {
fst: fst,
aut: aut,
inp: Vec::with_capacity(16),
Expand Down Expand Up @@ -794,84 +920,30 @@ impl<'f, A: Automaton> Stream<'f, A> {
}
}

/// Convert this stream into a vector of byte strings and outputs.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_byte_vec(mut self) -> Vec<(Vec<u8>, u64)> {
let mut vs = vec![];
while let Some((k, v)) = self.next() {
vs.push((k.to_vec(), v.value()));
}
vs
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were all of these methods removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those weren't removed, it is because git doesn't understand that it is a new struct named StreamWithState that I created and I didn't implement those methods on it to keep the code easier to follow.


/// Convert this stream into a vector of Unicode strings and outputs.
///
/// If any key is not valid UTF-8, then iteration on the stream is stopped
/// and a UTF-8 decoding error is returned.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_str_vec(mut self) -> Result<Vec<(String, u64)>> {
let mut vs = vec![];
while let Some((k, v)) = self.next() {
let k = String::from_utf8(k.to_vec()).map_err(Error::from)?;
vs.push((k, v.value()));
}
Ok(vs)
}

/// Convert this stream into a vector of byte strings.
/// This method only exist because the `map::Stream` implementation
/// use a `map::StreamWithState` internally and the
/// `StreamWithState::next` method require that the `Automaton::State`
/// implement `Clone` to be returned.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_byte_keys(mut self) -> Vec<Vec<u8>> {
let mut vs = vec![];
while let Some((k, _)) = self.next() {
vs.push(k.to_vec());
}
vs
}

/// Convert this stream into a vector of Unicode strings.
///
/// If any key is not valid UTF-8, then iteration on the stream is stopped
/// and a UTF-8 decoding error is returned.
///
/// Note that this creates a new allocation for every key in the stream.
pub fn into_str_keys(mut self) -> Result<Vec<String>> {
let mut vs = vec![];
while let Some((k, _)) = self.next() {
let k = String::from_utf8(k.to_vec()).map_err(Error::from)?;
vs.push(k);
}
Ok(vs)
}

/// Convert this stream into a vector of outputs.
pub fn into_values(mut self) -> Vec<u64> {
let mut vs = vec![];
while let Some((_, v)) = self.next() {
vs.push(v.value());
}
vs
}
}

impl<'f, 'a, A: Automaton> Streamer<'a> for Stream<'f, A> {
type Item = (&'a [u8], Output);

fn next(&'a mut self) -> Option<Self::Item> {
/// So using a function trick we can keep one implementation of this
/// algorithm, specify the "transform" method, the `A::State` can be cloned
/// or be ignored depending of the function.
#[inline]
fn next<F, T>(&mut self, transform: F) -> Option<(&[u8], Output, T)>
where F: Fn(&A::State) -> T
{
if let Some(out) = self.empty_output.take() {
if self.end_at.exceeded_by(&[]) {
self.stack.clear();
return None;
}
if self.aut.is_match(&self.aut.start()) {
return Some((&[], out));
let start = self.aut.start();
if self.aut.is_match(&start) {
return Some((&[], out, transform(&start)));
}
}
while let Some(state) = self.stack.pop() {
if state.trans >= state.node.len()
|| !self.aut.can_match(&state.aut_state) {
if state.trans >= state.node.len() || !self.aut.can_match(&state.aut_state) {
if state.node.addr() != self.fst.root_addr {
self.inp.pop().unwrap();
}
Expand All @@ -886,6 +958,7 @@ impl<'f, 'a, A: Automaton> Streamer<'a> for Stream<'f, A> {
self.stack.push(StreamState {
trans: state.trans + 1, .. state
});
let ns = transform(&next_state);
self.stack.push(StreamState {
node: next_node,
trans: 0,
Expand All @@ -898,13 +971,24 @@ impl<'f, 'a, A: Automaton> Streamer<'a> for Stream<'f, A> {
return None;
}
if next_node.is_final() && is_match {
return Some((&self.inp, out.cat(next_node.final_output())));
let out = out.cat(next_node.final_output());
return Some((&self.inp, out, ns));
}
}
None
}
}

impl<'f, 'a, A: 'a + Automaton> Streamer<'a> for StreamWithState<'f, A>
where A::State: Clone
{
type Item = (&'a [u8], Output, A::State);

fn next(&'a mut self) -> Option<Self::Item> {
self.next(Clone::clone)
}
}

/// An output is a value that is associated with a key in a finite state
/// transducer.
///
Expand Down