Skip to content

Commit

Permalink
feature(rust): Implement multi-character comment support in read_csv p…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrybugakov committed Nov 19, 2023
1 parent 84b0cba commit 22b1f5d
Show file tree
Hide file tree
Showing 21 changed files with 206 additions and 115 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use polars_core::prelude::*;
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
pub use read::{CsvEncoding, CsvReader, NullValues};
pub use read::{CommentPrefix, CsvEncoding, CsvReader, NullValues};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use write::{BatchedWriter, CsvWriter, QuoteStyle};
Expand Down
25 changes: 17 additions & 8 deletions crates/polars-io/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::prelude::*;
use super::buffer::*;
use crate::csv::read::NullValuesCompiled;
use crate::csv::splitfields::SplitFields;
use crate::csv::CommentPrefix;

/// Skip the utf-8 Byte Order Mark.
/// credits to csv-core
Expand All @@ -16,6 +17,17 @@ pub(crate) fn skip_bom(input: &[u8]) -> &[u8] {
}
}

/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
///
/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
pub(crate) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
match comment_prefix {
Some(CommentPrefix::Single(c)) => line.starts_with(&[*c]),
Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
None => false,
}
}

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
pub(crate) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
Expand Down Expand Up @@ -351,7 +363,7 @@ pub(super) fn parse_lines<'a>(
mut bytes: &'a [u8],
offset: usize,
separator: u8,
comment_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
missing_is_null: bool,
Expand Down Expand Up @@ -400,13 +412,10 @@ pub(super) fn parse_lines<'a>(
}

// deal with comments
if let Some(c) = comment_char {
// line is a comment -> skip
if bytes[0] == c {
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
bytes = bytes_rem;
continue;
}
if is_comment_line(bytes, comment_prefix) {
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
bytes = bytes_rem;
continue;
}

// Every line we only need to parse the columns that are projected.
Expand Down
55 changes: 47 additions & 8 deletions crates/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@ pub enum NullValues {
Named(Vec<(String, String)>),
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CommentPrefix {
/// A single byte character that indicates the start of a comment line.
Single(u8),
/// A string that indicates the start of a comment line.
/// This allows for multiple characters to be used as a comment identifier.
Multi(String),
}

impl CommentPrefix {
/// Creates a new `CommentPrefix` for the `Single` variant.
pub fn new_single(c: u8) -> Self {
CommentPrefix::Single(c)
}

/// Creates a new `CommentPrefix`. If `Multi` variant is used and the string is longer
/// than 5 characters, it will return `None`.
pub fn new_multi(s: String) -> Option<Self> {
if s.len() <= 5 {
Some(CommentPrefix::Multi(s))
} else {
None
}
}
}

pub(super) enum NullValuesCompiled {
/// A single value that's used for all columns
AllColumnsSingle(String),
Expand Down Expand Up @@ -118,7 +145,7 @@ where
dtype_overwrite: Option<&'a [DataType]>,
sample_size: usize,
chunk_size: usize,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
null_values: Option<NullValues>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
quote_char: Option<u8>,
Expand Down Expand Up @@ -210,9 +237,21 @@ where
self
}

/// Set the comment character. Lines starting with this character will be ignored.
pub fn with_comment_char(mut self, comment_char: Option<u8>) -> Self {
self.comment_char = comment_char;
/// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
pub fn with_comment_prefix(mut self, comment_prefix: Option<&str>) -> Self {
self.comment_prefix = comment_prefix.map(|s| {
if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
CommentPrefix::Single(s.as_bytes()[0])
} else {
CommentPrefix::Multi(s.to_string())
}
});
self
}

/// Sets the comment prefix from `CsvParserOptions` for internal initialization.
pub fn _with_comment_prefix(mut self, comment_prefix: Option<CommentPrefix>) -> Self {
self.comment_prefix = comment_prefix;
self
}

Expand Down Expand Up @@ -370,7 +409,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
std::mem::take(&mut self.comment_prefix),
self.quote_char,
self.eol_char,
std::mem::take(&mut self.null_values),
Expand Down Expand Up @@ -487,7 +526,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
None,
&mut self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
Expand Down Expand Up @@ -516,7 +555,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
None,
&mut self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
Expand Down Expand Up @@ -556,7 +595,7 @@ where
sample_size: 1024,
chunk_size: 1 << 18,
low_memory: false,
comment_char: None,
comment_prefix: None,
eol_char: b'\n',
null_values: None,
missing_is_null: true,
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/csv/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl<'a> CoreReader<'a> {
projection,
starting_point_offset,
row_count: self.row_count,
comment_char: self.comment_char,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
null_values: self.null_values,
Expand Down Expand Up @@ -182,7 +182,7 @@ pub struct BatchedCsvReaderMmap<'a> {
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> BatchedCsvReaderMmap<'a> {
bytes_offset_thread,
self.quote_char,
self.eol_char,
self.comment_char,
self.comment_prefix.as_ref(),
self.chunk_size,
&self.str_capacities,
self.encoding,
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/csv/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl<'a> CoreReader<'a> {
projection,
starting_point_offset,
row_count: self.row_count,
comment_char: self.comment_char,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
null_values: self.null_values,
Expand Down Expand Up @@ -265,7 +265,7 @@ pub struct BatchedCsvReaderRead<'a> {
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
Expand Down Expand Up @@ -337,7 +337,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
0,
self.quote_char,
self.eol_char,
self.comment_char,
self.comment_prefix.as_ref(),
self.chunk_size,
&self.str_capacities,
self.encoding,
Expand Down
29 changes: 14 additions & 15 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use rayon::prelude::*;

use crate::csv::buffer::*;
use crate::csv::parser::*;
use crate::csv::read::NullValuesCompiled;
use crate::csv::read::{CommentPrefix, NullValuesCompiled};
use crate::csv::utils::*;
use crate::csv::{CsvEncoding, NullValues};
use crate::mmap::ReaderBytes;
Expand Down Expand Up @@ -109,7 +109,7 @@ pub(crate) struct CoreReader<'a> {
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl<'a> CoreReader<'a> {
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<'a> CoreReader<'a> {
schema_overwrite.as_deref(),
&mut skip_rows,
skip_rows_after_header,
comment_char,
comment_prefix.as_ref(),
quote_char,
eol_char,
null_values.as_ref(),
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<'a> CoreReader<'a> {
sample_size,
chunk_size,
low_memory,
comment_char,
comment_prefix,
quote_char,
eol_char,
null_values,
Expand Down Expand Up @@ -342,14 +342,13 @@ impl<'a> CoreReader<'a> {

if self.skip_rows_after_header > 0 {
for _ in 0..self.skip_rows_after_header {
let pos = match bytes.first() {
Some(first) if Some(*first) == self.comment_char => {
next_line_position_naive(bytes, eol_char)
},
let pos = if is_comment_line(bytes, self.comment_prefix.as_ref()) {
next_line_position_naive(bytes, eol_char)
} else {
// we don't pass expected fields
// as we want to skip all rows
// no matter the no. of fields
_ => next_line_position(bytes, None, self.separator, self.quote_char, eol_char),
next_line_position(bytes, None, self.separator, self.quote_char, eol_char)
}
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;

Expand Down Expand Up @@ -598,7 +597,7 @@ impl<'a> CoreReader<'a> {
local_bytes,
offset,
self.separator,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.missing_is_null,
Expand Down Expand Up @@ -670,7 +669,7 @@ impl<'a> CoreReader<'a> {
bytes_offset_thread,
self.quote_char,
self.eol_char,
self.comment_char,
self.comment_prefix.as_ref(),
capacity,
&str_capacities,
self.encoding,
Expand Down Expand Up @@ -716,7 +715,7 @@ impl<'a> CoreReader<'a> {
remaining_bytes,
0,
self.separator,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.missing_is_null,
Expand Down Expand Up @@ -800,7 +799,7 @@ fn read_chunk(
bytes_offset_thread: usize,
quote_char: Option<u8>,
eol_char: u8,
comment_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
capacity: usize,
str_capacities: &[RunningSize],
encoding: CsvEncoding,
Expand Down Expand Up @@ -835,7 +834,7 @@ fn read_chunk(
local_bytes,
offset,
separator,
comment_char,
comment_prefix,
quote_char,
eol_char,
missing_is_null,
Expand Down

0 comments on commit 22b1f5d

Please sign in to comment.