Skip to content

Commit

Permalink
[User]Reader wraps SystemReader, not RawReader (#567)
Browse files Browse the repository at this point in the history
* UserReader now wraps SystemReader, not RawReader.
* Removed string, symbol, and int caches from SystemReader

---------

Co-authored-by: Zack Slayton <zslayton@amazon.com>
  • Loading branch information
zslayton and Zack Slayton committed Jun 14, 2023
1 parent 7bc5c74 commit 82f17fc
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 343 deletions.
269 changes: 40 additions & 229 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ use std::ops::Range;
use delegate::delegate;

use crate::binary::constants::v1_0::IVM;
use crate::constants::v1_0::system_symbol_ids;
use crate::binary::non_blocking::raw_binary_reader::RawBinaryReader;
use crate::data_source::ToIonDataSource;
use crate::element::{Blob, Clob};
use crate::raw_reader::{RawReader, RawStreamItem};
use crate::raw_reader::{Expandable, RawReader};
use crate::raw_symbol_token::RawSymbolToken;
use crate::result::{decoding_error, decoding_error_raw, IonResult};
use crate::result::{decoding_error, IonResult};
use crate::stream_reader::IonReader;
use crate::symbol_table::SymbolTable;
use crate::types::{Decimal, Int, Symbol, Timestamp};
use crate::{BlockingRawBinaryReader, BlockingRawTextReader, IonType};
use crate::{
BlockingRawBinaryReader, BlockingRawTextReader, IonType, SystemReader, SystemStreamItem,
};
use std::fmt::{Display, Formatter};

use crate::types::Str;
Expand Down Expand Up @@ -86,18 +88,12 @@ impl ReaderBuilder {

fn make_text_reader<'a, I: 'a + ToIonDataSource>(data: I) -> IonResult<Reader<'a>> {
let raw_reader = Box::new(BlockingRawTextReader::new(data)?);
Ok(Reader {
raw_reader,
symbol_table: SymbolTable::new(),
})
Ok(Reader::new(raw_reader))
}

fn make_binary_reader<'a, I: 'a + ToIonDataSource>(data: I) -> IonResult<Reader<'a>> {
let raw_reader = Box::new(BlockingRawBinaryReader::new(data)?);
Ok(Reader {
raw_reader,
symbol_table: SymbolTable::new(),
})
Ok(Reader::new(raw_reader))
}
}

Expand All @@ -116,15 +112,13 @@ pub type Reader<'a> = UserReader<Box<dyn RawReader + 'a>>;
/// Reader itself is format-agnostic; all format-specific logic is handled by the
/// wrapped [RawReader] implementation.
pub struct UserReader<R: RawReader> {
raw_reader: R,
symbol_table: SymbolTable,
system_reader: SystemReader<R>,
}

impl<R: RawReader> UserReader<R> {
pub fn new(raw_reader: R) -> UserReader<R> {
UserReader {
raw_reader,
symbol_table: SymbolTable::new(),
system_reader: SystemReader::new(raw_reader),
}
}
}
Expand Down Expand Up @@ -182,260 +176,77 @@ impl Display for StreamItem {

impl<R: RawReader> UserReader<R> {
pub fn read_raw_symbol(&mut self) -> IonResult<RawSymbolToken> {
self.raw_reader.read_symbol()
self.system_reader.read_raw_symbol()
}

pub fn raw_field_name_token(&mut self) -> IonResult<RawSymbolToken> {
self.raw_reader.field_name()
self.system_reader.raw_field_name_token()
}

fn read_symbol_table(&mut self) -> IonResult<()> {
self.raw_reader.step_in()?;

let mut is_append = false;
let mut new_symbols = vec![];

// It's illegal for a symbol table to have multiple `symbols` or `imports` fields.
// Keep track of whether we've already encountered them.
let mut has_found_symbols_field = false;
let mut has_found_imports_field = false;

loop {
let ion_type = match self.raw_reader.next()? {
RawStreamItem::Value(ion_type) => ion_type,
RawStreamItem::Null(_) => continue,
RawStreamItem::Nothing => break,
RawStreamItem::VersionMarker(major, minor) => {
return decoding_error(format!(
"encountered Ion version marker for v{major}.{minor} in symbol table"
))
}
};

let field_id = self
.raw_reader
.field_name()
.expect("No field ID found inside $ion_symbol_table struct.");
match (field_id, ion_type) {
// The field name is either SID 6 or the text 'imports' and the
// field value is a non-null List
(symbol, IonType::List)
if symbol.matches(system_symbol_ids::IMPORTS, "imports") =>
{
// TODO: SST imports. This implementation only supports local symbol
// table imports and appends.
return decoding_error("importing shared symbol tables is not yet supported");
}
// The field name is either SID 6 or the text 'imports' and the
// field value is a non-null symbol
(symbol, IonType::Symbol)
if symbol.matches(system_symbol_ids::IMPORTS, "imports") =>
{
if has_found_imports_field {
return decoding_error("symbol table had multiple 'imports' fields");
}
has_found_imports_field = true;
let import_symbol = self.raw_reader.read_symbol()?;
if !import_symbol.matches(3, "$ion_symbol_table") {
// Field name `imports` with a symbol other than $ion_symbol_table is ignored
continue;
}
is_append = true;
}
// The field name is either SID 7 or the text 'imports' and the
// field value is a non-null list
(symbol, IonType::List)
if symbol.matches(system_symbol_ids::SYMBOLS, "symbols") =>
{
if has_found_symbols_field {
return decoding_error("symbol table had multiple 'symbols' fields");
}
has_found_symbols_field = true;
self.raw_reader.step_in()?;
loop {
use RawStreamItem::*;
match self.raw_reader.next()? {
Value(IonType::String) => {
new_symbols.push(Some(self.raw_reader.read_string()?));
}
Value(_) | Null(_) => {
// If we encounter a non-string or null, add a placeholder
new_symbols.push(None);
}
VersionMarker(_, _) => {
return decoding_error("Found IVM in symbol table.")
}
Nothing => break,
}
}
self.raw_reader.step_out()?;
}
something_else => {
unimplemented!("No support for {:?}", something_else);
}
}
}

if is_append {
// We're adding new symbols to the end of the symbol table.
for maybe_text in new_symbols.drain(..) {
let _sid = self.symbol_table.intern_or_add_placeholder(maybe_text);
}
} else {
// The symbol table has been set by defining new symbols without importing the current
// symbol table.
self.symbol_table.reset();
for maybe_text in new_symbols.drain(..) {
let _sid = self.symbol_table.intern_or_add_placeholder(maybe_text);
}
}

self.raw_reader.step_out()?;
Ok(())
}

fn raw_annotations(&mut self) -> impl Iterator<Item = RawSymbolToken> + '_ {
// RawReader implementations do not attempt to resolve each annotation into text.
// Additionally, they perform all I/O related to annotations in their implementations
// of Reader::next. As such, it's safe to call `unwrap()` on each raw annotation.
self.raw_reader.annotations().map(|a| a.unwrap())
fn raw_annotations(&mut self) -> impl Iterator<Item = IonResult<RawSymbolToken>> + '_ {
self.system_reader.raw_annotations()
}

pub fn symbol_table(&self) -> &SymbolTable {
&self.symbol_table
self.system_reader.symbol_table()
}
}

impl<R: RawReader> IonReader for UserReader<R> {
type Item = StreamItem;
type Symbol = Symbol;

fn current(&self) -> Self::Item {
if let Some(ion_type) = self.ion_type() {
return if self.is_null() {
StreamItem::Null(ion_type)
} else {
StreamItem::Value(ion_type)
};
}
StreamItem::Nothing
}

/// Advances the raw reader to the next user-level Ion value, processing any system-level directives
/// encountered along the way.
// v-- Clippy complains that `next` resembles `Iterator::next()`
#[allow(clippy::should_implement_trait)]
fn next(&mut self) -> IonResult<Self::Item> {
use RawStreamItem::*;
use SystemStreamItem::*;
loop {
match self.raw_reader.next()? {
VersionMarker(1, 0) => {
self.symbol_table.reset();
}
VersionMarker(major, minor) => {
return decoding_error(format!(
"Encountered a version marker for v{major}.{minor}, but only v1.0 is supported."
));
}
Value(IonType::Struct) => {
// Top-level structs whose _first_ annotation is $ion_symbol_table are
// interpreted as local symbol tables. Other trailing annotations (if any) are
// ignored. If the first annotation is something other than `$ion_symbol_table`,
// the struct is considered user data even if one of the trailing annotations
// is `$ion_symbol_table`. For more information, see this section of the spec:
// https://amazon-ion.github.io/ion-docs/docs/symbols.html#local-symbol-tables
if self.raw_reader.depth() == 0 {
let is_symtab = match self.raw_reader.annotations().next() {
Some(Err(error)) => return Err(error),
Some(Ok(symbol))
if symbol.matches(
system_symbol_ids::ION_SYMBOL_TABLE,
"$ion_symbol_table",
) =>
{
true
}
_ => false,
};
// This logic cannot be merged into the `match` statement above because
// `self.read_symbol_table()` requires a mutable borrow which is not
// possible while iterating over the reader's annotations.
if is_symtab {
self.read_symbol_table()?;
continue;
}
}
return Ok(StreamItem::Value(IonType::Struct));
}
Value(ion_type) => return Ok(StreamItem::Value(ion_type)),
Null(ion_type) => return Ok(StreamItem::Null(ion_type)),
Nothing => return Ok(StreamItem::Nothing),
}
}
}

fn field_name(&self) -> IonResult<Self::Symbol> {
match self.raw_reader.field_name()? {
RawSymbolToken::SymbolId(sid) => {
self.symbol_table.symbol_for(sid).cloned().ok_or_else(|| {
decoding_error_raw(format!("encountered field ID with unknown text: ${sid}"))
})
}
RawSymbolToken::Text(text) => Ok(Symbol::owned(text)),
// If the system reader encounters an encoding artifact like a symbol table or IVM,
// keep going until we find a value or exhaust the stream.
let item = match self.system_reader.next()? {
VersionMarker(_, _) | SymbolTableValue(_) | SymbolTableNull(_) => continue,
Value(ion_type) => StreamItem::Value(ion_type),
Null(ion_type) => StreamItem::Null(ion_type),
Nothing => StreamItem::Nothing,
};
return Ok(item);
}
}

fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Self::Symbol>> + 'a> {
let iterator = self
.raw_reader
.annotations()
.map(move |raw_token| match raw_token? {
RawSymbolToken::SymbolId(sid) => {
self.symbol_table.symbol_for(sid).cloned().ok_or_else(|| {
decoding_error_raw(format!("found annotation ID with unknown text: ${sid}"))
})
}
RawSymbolToken::Text(text) => Ok(Symbol::owned(text)),
});
Box::new(iterator)
}

fn read_symbol(&mut self) -> IonResult<Self::Symbol> {
match self.raw_reader.read_symbol()? {
RawSymbolToken::SymbolId(symbol_id) => {
if let Some(symbol) = self.symbol_table.symbol_for(symbol_id) {
Ok(symbol.clone())
} else {
decoding_error(format!(
"Found symbol ID ${symbol_id}, which is not defined."
))
}
}
RawSymbolToken::Text(text) => Ok(Symbol::owned(text)),
fn current(&self) -> Self::Item {
if let Some(ion_type) = self.ion_type() {
return if self.is_null() {
StreamItem::Null(ion_type)
} else {
StreamItem::Value(ion_type)
};
}
StreamItem::Nothing
}

// The Reader needs to expose many of the same functions as the Cursor, but only some of those
// need to be re-defined to allow for system value processing. Any method listed here will be
// delegated to self.raw_reader directly.
delegate! {
to self.raw_reader {
to self.system_reader {
fn is_null(&self) -> bool;
fn ion_version(&self) -> (u8, u8);
fn ion_type(&self) -> Option<IonType>;
fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Symbol>> + 'a>;
fn read_null(&mut self) -> IonResult<IonType>;
fn read_bool(&mut self) -> IonResult<bool>;
fn read_int(&mut self) -> IonResult<Int>;
fn read_i64(&mut self) -> IonResult<i64>;
fn read_f32(&mut self) -> IonResult<f32>;
fn read_f64(&mut self) -> IonResult<f64>;
fn read_decimal(&mut self) -> IonResult<Decimal>;
fn read_symbol(&mut self) -> IonResult<Self::Symbol>;
fn read_string(&mut self) -> IonResult<Str>;
fn read_str(&mut self) -> IonResult<&str>;
fn read_blob(&mut self) -> IonResult<Blob>;
fn read_clob(&mut self) -> IonResult<Clob>;
fn read_timestamp(&mut self) -> IonResult<Timestamp>;
fn step_in(&mut self) -> IonResult<()>;
fn field_name(&self) -> IonResult<Symbol>;
fn step_out(&mut self) -> IonResult<()>;
fn parent_type(&self) -> Option<IonType>;
fn depth(&self) -> usize;
Expand All @@ -445,9 +256,9 @@ impl<R: RawReader> IonReader for UserReader<R> {

/// Functionality that is only available if the data source we're reading from is in-memory, like
/// a `Vec<u8>` or `&[u8]`.
impl<T: AsRef<[u8]>> UserReader<BlockingRawBinaryReader<io::Cursor<T>>> {
impl<T: AsRef<[u8]> + Expandable> UserReader<RawBinaryReader<T>> {
delegate! {
to self.raw_reader {
to self.system_reader {
pub fn raw_bytes(&self) -> Option<&[u8]>;
pub fn raw_field_id_bytes(&self) -> Option<&[u8]>;
pub fn raw_header_bytes(&self) -> Option<&[u8]>;
Expand Down Expand Up @@ -503,7 +314,7 @@ mod tests {

// Prepends an Ion 1.0 IVM to the provided data and then creates a BinaryIonCursor over it
fn raw_binary_reader_for(bytes: &[u8]) -> BlockingRawBinaryReader<TestDataSource> {
use RawStreamItem::*;
use crate::RawStreamItem::*;
let mut raw_reader =
BlockingRawBinaryReader::new(data_source_for(bytes)).expect("unable to create reader");
assert_eq!(raw_reader.ion_type(), None);
Expand Down
Loading

0 comments on commit 82f17fc

Please sign in to comment.