Skip to content

Commit

Permalink
fix(cubestore): CSV import escape sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 26, 2021
1 parent 2feb99e commit a3e118e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
32 changes: 22 additions & 10 deletions rust/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl ImportFormat {
mapping.push(
columns
.iter()
.find(|c| c.get_name() == next_column)
.find(|c| c.get_name() == &next_column)
.cloned()
.ok_or(CubeError::user(format!(
"Column {} not found during import in {:?}",
Expand All @@ -81,17 +81,19 @@ impl ImportFormat {
let value = parser.next_value()?;

row.push(match column.get_column_type() {
ColumnType::String => TableValue::String(value.to_string()),
ColumnType::String => TableValue::String(value),
ColumnType::Int => value
.parse()
.map(|v| TableValue::Int(v))
.unwrap_or(TableValue::Null),
ColumnType::Decimal { .. } => BigDecimal::from_str_radix(value, 10)
.map(|d| TableValue::Decimal(d.to_string()))
.unwrap_or(TableValue::Null),
ColumnType::Decimal { .. } => {
BigDecimal::from_str_radix(value.as_str(), 10)
.map(|d| TableValue::Decimal(d.to_string()))
.unwrap_or(TableValue::Null)
}
ColumnType::Bytes => unimplemented!(),
ColumnType::HyperLogLog(_) => unimplemented!(),
ColumnType::Timestamp => timestamp_from_string(value)?,
ColumnType::Timestamp => timestamp_from_string(value.as_str())?,
ColumnType::Float => {
TableValue::Float(value.parse::<f64>()?.to_string())
}
Expand Down Expand Up @@ -123,18 +125,28 @@ impl<'a> CsvLineParser<'a> {
}
}

fn next_value(&mut self) -> Result<&str, CubeError> {
fn next_value(&mut self) -> Result<String, CubeError> {
Ok(if self.remaining.chars().nth(0) == Some('"') {
let closing_index = self.remaining.find("\"").ok_or(CubeError::user(format!(
let mut closing_index = None;
let mut i = 0;
while i < self.remaining.len() {
if i < self.remaining.len() - 1 && &self.remaining[i..(i + 2)] == "\"\"" {
i += 1;
} else if &self.remaining[i..(i + 1)] == "\"" {
closing_index = Some(i);
}
i += 1;
}
let closing_index = closing_index.ok_or(CubeError::user(format!(
"Malformed CSV string: {}",
self.line
)))?;
let res: &str = self.remaining[1..closing_index].as_ref();
let res: String = self.remaining[1..closing_index].replace("\"\"", "\"");
self.remaining = self.remaining[closing_index..].as_ref();
res
} else {
let next_comma = self.remaining.find(",").unwrap_or(self.remaining.len());
let res: &str = self.remaining[0..next_comma].as_ref();
let res: String = self.remaining[0..next_comma].to_string();
self.remaining = self.remaining[next_comma..].as_ref();
res
})
Expand Down
11 changes: 7 additions & 4 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2075,21 +2075,24 @@ mod tests {

let mut file = File::create(dir.clone()).unwrap();

file.write_all("id,city,t\n".as_bytes()).unwrap();
file.write_all("1,San Francisco,2021-01-24 12:12:23 UTC\n".as_bytes()).unwrap();
file.write_all("2,New York,2021-01-24 19:12:23 UTC\n".as_bytes()).unwrap();
file.write_all("id,city,t,arr\n".as_bytes()).unwrap();
file.write_all("1,San Francisco,2021-01-24 12:12:23 UTC,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\"\n".as_bytes()).unwrap();
file.write_all("2,New York,2021-01-24 19:12:23 UTC,\"[\"\"\"\"]\"\n".as_bytes()).unwrap();

dir
};

let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap();
let _ = service.exec_query(&format!("CREATE TABLE Foo.Persons (id int, city text, t timestamp) INDEX persons_city (`city`, `id`) LOCATION '{}'", path.as_os_str().to_string_lossy())).await.unwrap();
let _ = service.exec_query(&format!("CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION '{}'", path.as_os_str().to_string_lossy())).await.unwrap();
let res = service.exec_query("CREATE INDEX by_city ON Foo.Persons (city)").await;
let error = format!("{:?}", res);
assert!(error.contains("has data"));

let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons").await.unwrap();
assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(2)]));

let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons WHERE arr = '[\"Foo\",\"Bar\",\"FooBar\"]' or arr = '[\"\"]'").await.unwrap();
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(2)])]);
}).await;
}

Expand Down

0 comments on commit a3e118e

Please sign in to comment.