Skip to content

Commit

Permalink
feat(python, rust): timestampNtz support (#2236)
Browse files Browse the repository at this point in the history
# Description

- This addresses all our timestamp inconsistencies, where we were
reading Primitive:timestamp as a datatetime without UTC, and now we can
properly write datetimes with no timezone as columns to
Primitive::timestampNtz.
- addressing small bug where checkConstraints feature was not set in
writerFeatures when you are on table writer version 7.
- bumping default protocol to 3,7
- Made the pyarrow writer and reader more flexible so we can write/read
a 3,7 table as long as it has the supported features there.
- Properly parses timestamps with UTC into pyarrow timestamps with UTC
- Added configkey translation to tablefeature inside the Create
Operation

# Related Issue(s)
- closes #1598
- closes #1019
- closes #1777
  • Loading branch information
ion-elgreco committed Mar 5, 2024
1 parent c3d532b commit cbcf6df
Show file tree
Hide file tree
Showing 23 changed files with 341 additions and 92 deletions.
22 changes: 17 additions & 5 deletions crates/core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ impl TryFrom<&DataType> for ArrowDataType {
// timezone. Stored as 4 bytes integer representing days since 1970-01-01
Ok(ArrowDataType::Date32)
}
PrimitiveType::Timestamp => {
// Issue: https://github.com/delta-io/delta/issues/643
PrimitiveType::Timestamp => Ok(ArrowDataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".into()),
)),
PrimitiveType::TimestampNtz => {
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}
}
Expand Down Expand Up @@ -217,7 +220,7 @@ impl TryFrom<&ArrowDataType> for DataType {
ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
Ok(DataType::Primitive(PrimitiveType::Timestamp))
Ok(DataType::Primitive(PrimitiveType::TimestampNtz))
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz))
if tz.eq_ignore_ascii_case("utc") =>
Expand Down Expand Up @@ -770,18 +773,27 @@ mod tests {
#[test]
fn test_arrow_from_delta_timestamp_type() {
let timestamp_field = DataType::Primitive(PrimitiveType::Timestamp);
assert_eq!(
<ArrowDataType as TryFrom<&DataType>>::try_from(&timestamp_field).unwrap(),
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into()))
);
}

#[test]
fn test_arrow_from_delta_timestampntz_type() {
let timestamp_field = DataType::Primitive(PrimitiveType::TimestampNtz);
assert_eq!(
<ArrowDataType as TryFrom<&DataType>>::try_from(&timestamp_field).unwrap(),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
);
}

#[test]
fn test_delta_from_arrow_timestamp_type() {
fn test_delta_from_arrow_timestamp_type_no_tz() {
let timestamp_field = ArrowDataType::Timestamp(TimeUnit::Microsecond, None);
assert_eq!(
<DataType as TryFrom<&ArrowDataType>>::try_from(&timestamp_field).unwrap(),
DataType::Primitive(PrimitiveType::Timestamp)
DataType::Primitive(PrimitiveType::TimestampNtz)
);
}

Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/kernel/expressions/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ impl Scalar {
Double(val) => Arc::new(Float64Array::from_value(*val, num_rows)),
String(val) => Arc::new(StringArray::from(vec![val.clone(); num_rows])),
Boolean(val) => Arc::new(BooleanArray::from(vec![*val; num_rows])),
Timestamp(val) => Arc::new(TimestampMicrosecondArray::from_value(*val, num_rows)),
Timestamp(val) => {
Arc::new(TimestampMicrosecondArray::from_value(*val, num_rows).with_timezone("UTC"))
}
TimestampNtz(val) => Arc::new(TimestampMicrosecondArray::from_value(*val, num_rows)),
Date(val) => Arc::new(Date32Array::from_value(*val, num_rows)),
Binary(val) => Arc::new(BinaryArray::from(vec![val.as_slice(); num_rows])),
Decimal(val, precision, scale) => Arc::new(
Expand All @@ -64,6 +67,9 @@ impl Scalar {
PrimitiveType::String => Arc::new(StringArray::new_null(num_rows)),
PrimitiveType::Boolean => Arc::new(BooleanArray::new_null(num_rows)),
PrimitiveType::Timestamp => {
Arc::new(TimestampMicrosecondArray::new_null(num_rows).with_timezone("UTC"))
}
PrimitiveType::TimestampNtz => {
Arc::new(TimestampMicrosecondArray::new_null(num_rows))
}
PrimitiveType::Date => Arc::new(Date32Array::new_null(num_rows)),
Expand Down
32 changes: 27 additions & 5 deletions crates/core/src/kernel/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub enum Scalar {
Boolean(bool),
/// Microsecond precision timestamp, adjusted to UTC.
Timestamp(i64),
/// Microsecond precision timestamp, with no timezone.
TimestampNtz(i64),
/// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
Date(i32),
/// Binary data
Expand All @@ -58,6 +60,7 @@ impl Scalar {
Self::String(_) => DataType::Primitive(PrimitiveType::String),
Self::Boolean(_) => DataType::Primitive(PrimitiveType::Boolean),
Self::Timestamp(_) => DataType::Primitive(PrimitiveType::Timestamp),
Self::TimestampNtz(_) => DataType::Primitive(PrimitiveType::TimestampNtz),
Self::Date(_) => DataType::Primitive(PrimitiveType::Date),
Self::Binary(_) => DataType::Primitive(PrimitiveType::Binary),
Self::Decimal(_, precision, scale) => DataType::decimal(*precision, *scale),
Expand Down Expand Up @@ -88,7 +91,7 @@ impl Scalar {
"false".to_string()
}
}
Self::Timestamp(ts) => {
Self::TimestampNtz(ts) | Self::Timestamp(ts) => {
let ts = Utc.timestamp_micros(*ts).single().unwrap();
ts.format("%Y-%m-%d %H:%M:%S%.6f").to_string()
}
Expand Down Expand Up @@ -222,10 +225,17 @@ impl Scalar {
.downcast_ref::<Date32Array>()
.map(|v| Self::Date(v.value(index))),
// TODO handle timezones when implementing timestamp ntz feature.
Timestamp(TimeUnit::Microsecond, None) => arr
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.map(|v| Self::Timestamp(v.value(index))),
Timestamp(TimeUnit::Microsecond, tz) => match tz {
None => arr
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.map(|v| Self::Timestamp(v.value(index))),
Some(tz_str) if tz_str.as_ref() == "UTC" => arr
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.map(|v| Self::Timestamp(v.clone().with_timezone("UTC").value(index))),
_ => None,
},
Struct(fields) => {
let struct_fields = fields
.iter()
Expand Down Expand Up @@ -283,6 +293,7 @@ impl PartialOrd for Scalar {
(String(a), String(b)) => a.partial_cmp(b),
(Boolean(a), Boolean(b)) => a.partial_cmp(b),
(Timestamp(a), Timestamp(b)) => a.partial_cmp(b),
(TimestampNtz(a), TimestampNtz(b)) => a.partial_cmp(b),
(Date(a), Date(b)) => a.partial_cmp(b),
(Binary(a), Binary(b)) => a.partial_cmp(b),
(Decimal(a, _, _), Decimal(b, _, _)) => a.partial_cmp(b),
Expand All @@ -308,6 +319,7 @@ impl Display for Scalar {
Self::String(s) => write!(f, "'{}'", s),
Self::Boolean(b) => write!(f, "{}", b),
Self::Timestamp(ts) => write!(f, "{}", ts),
Self::TimestampNtz(ts) => write!(f, "{}", ts),
Self::Date(d) => write!(f, "{}", d),
Self::Binary(b) => write!(f, "{:?}", b),
Self::Decimal(value, _, scale) => match scale.cmp(&0) {
Expand Down Expand Up @@ -433,6 +445,16 @@ impl PrimitiveType {
.ok_or(self.parse_error(raw))?;
Ok(Scalar::Timestamp(micros))
}
TimestampNtz => {
let timestamp = NaiveDateTime::parse_from_str(raw, "%Y-%m-%d %H:%M:%S%.f")
.map_err(|_| self.parse_error(raw))?;
let timestamp = Utc.from_utc_datetime(&timestamp);
let micros = timestamp
.signed_duration_since(*UNIX_EPOCH)
.num_microseconds()
.ok_or(self.parse_error(raw))?;
Ok(Scalar::TimestampNtz(micros))
}
Binary => {
let bytes = parse_escaped_binary_string(raw).map_err(|_| self.parse_error(raw))?;
Ok(Scalar::Binary(bytes))
Expand Down
20 changes: 11 additions & 9 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub enum ReaderFeatures {
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(alias = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
Expand All @@ -189,7 +189,9 @@ impl From<&parquet::record::Field> for ReaderFeatures {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" => ReaderFeatures::DeletionVectors,
"deletionVectors" | "delta.enableDeletionVectors" => {
ReaderFeatures::DeletionVectors
}
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
Expand Down Expand Up @@ -259,7 +261,7 @@ pub enum WriterFeatures {
/// Row tracking on tables
RowTracking,
/// timestamps without timezone support
#[serde(alias = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// domain specific metadata
DomainMetadata,
Expand All @@ -281,15 +283,15 @@ impl From<String> for WriterFeatures {
impl From<&str> for WriterFeatures {
fn from(value: &str) -> Self {
match value {
"appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" => WriterFeatures::ChangeDataFeed,
"appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly,
"invariants" | "delta.invariants" => WriterFeatures::Invariants,
"checkConstraints" | "delta.checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" => WriterFeatures::RowTracking,
"deletionVectors" | "delta.enableDeletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" | "delta.enableRowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ fn default_true() -> bool {
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "snake_case")]
/// Primitive types supported by Delta
pub enum PrimitiveType {
/// UTF-8 encoded string of characters
Expand All @@ -493,7 +493,9 @@ pub enum PrimitiveType {
Date,
/// Microsecond precision timestamp, adjusted to UTC.
Timestamp,
// TODO: timestamp without timezone
/// Micrsoecond precision timestamp with no timezone
#[serde(alias = "timestampNtz")]
TimestampNtz,
#[serde(
serialize_with = "serialize_decimal",
deserialize_with = "deserialize_decimal",
Expand Down Expand Up @@ -554,6 +556,7 @@ impl Display for PrimitiveType {
PrimitiveType::Binary => write!(f, "binary"),
PrimitiveType::Date => write!(f, "date"),
PrimitiveType::Timestamp => write!(f, "timestamp"),
PrimitiveType::TimestampNtz => write!(f, "timestampNtz"),
PrimitiveType::Decimal(precision, scale) => {
write!(f, "decimal({},{})", precision, scale)
}
Expand Down Expand Up @@ -608,6 +611,7 @@ impl DataType {
pub const BINARY: Self = DataType::Primitive(PrimitiveType::Binary);
pub const DATE: Self = DataType::Primitive(PrimitiveType::Date);
pub const TIMESTAMP: Self = DataType::Primitive(PrimitiveType::Timestamp);
pub const TIMESTAMPNTZ: Self = DataType::Primitive(PrimitiveType::TimestampNtz);

pub fn decimal(precision: u8, scale: i8) -> Self {
DataType::Primitive(PrimitiveType::Decimal(precision, scale))
Expand Down
14 changes: 12 additions & 2 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext,
};
use crate::kernel::{CommitInfo, IsolationLevel, Protocol};
use crate::kernel::{CommitInfo, IsolationLevel, Protocol, WriterFeatures};
use crate::logstore::LogStoreRef;
use crate::operations::datafusion_utils::Expression;
use crate::operations::transaction::commit;
Expand Down Expand Up @@ -177,7 +177,17 @@ impl std::future::IntoFuture for ConstraintBuilder {
3
},
reader_features: old_protocol.reader_features.clone(),
writer_features: old_protocol.writer_features.clone(),
writer_features: if old_protocol.min_writer_version < 7 {
old_protocol.writer_features.clone()
} else {
let current_features = old_protocol.writer_features.clone();
if let Some(mut features) = current_features {
features.insert(WriterFeatures::CheckConstraints);
Some(features)
} else {
current_features
}
},
};

let operational_parameters = HashMap::from_iter([
Expand Down
51 changes: 45 additions & 6 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use futures::future::BoxFuture;
use serde_json::Value;

use super::transaction::{commit, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType};
use crate::kernel::{
Action, DataType, Metadata, Protocol, ReaderFeatures, StructField, StructType, WriterFeatures,
};
use crate::logstore::{LogStore, LogStoreRef};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
Expand Down Expand Up @@ -233,8 +235,45 @@ impl CreateBuilder {
)
};

let contains_timestampntz = &self
.columns
.iter()
.any(|f| f.data_type() == &DataType::TIMESTAMPNTZ);

// TODO configure more permissive versions based on configuration. Also how should this ideally be handled?
// We set the lowest protocol we can, and if subsequent writes use newer features we update metadata?

let (min_reader_version, min_writer_version, writer_features, reader_features) =
if *contains_timestampntz {
let mut converted_writer_features = self
.configuration
.keys()
.map(|key| key.clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.collect::<HashSet<WriterFeatures>>();

let mut converted_reader_features = self
.configuration
.keys()
.map(|key| key.clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
converted_writer_features.insert(WriterFeatures::TimestampWithoutTimezone);
converted_reader_features.insert(ReaderFeatures::TimestampWithoutTimezone);
(
3,
7,
Some(converted_writer_features),
Some(converted_reader_features),
)
} else {
(
PROTOCOL.default_reader_version(),
PROTOCOL.default_writer_version(),
None,
None,
)
};
let protocol = self
.actions
.iter()
Expand All @@ -244,10 +283,10 @@ impl CreateBuilder {
_ => unreachable!(),
})
.unwrap_or_else(|| Protocol {
min_reader_version: PROTOCOL.default_reader_version(),
min_writer_version: PROTOCOL.default_writer_version(),
writer_features: None,
reader_features: None,
min_reader_version,
min_writer_version,
writer_features,
reader_features,
});

let mut metadata = Metadata::try_new(
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,7 @@ mod tests {
table
}

// TODO(ion): property keys are not passed through or translated as table features.. fix this as well
#[tokio::test]
async fn test_merge_when_delta_table_is_append_only() {
let schema = get_arrow_schema(&None);
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ pub enum TransactionError {
UnsupportedWriterFeatures(Vec<WriterFeatures>),

/// Error returned when writer features are required but not specified
#[error("Writer features must be specified for writerversion >= 7")]
WriterFeaturesRequired,
#[error("Writer features must be specified for writerversion >= 7, please specify: {0:?}")]
WriterFeaturesRequired(WriterFeatures),

/// Error returned when reader features are required but not specified
#[error("Reader features must be specified for reader version >= 3")]
ReaderFeaturesRequired,
#[error("Reader features must be specified for reader version >= 3, please specify: {0:?}")]
ReaderFeaturesRequired(ReaderFeatures),

/// The transaction failed to commit due to an error in an implementation-specific layer.
/// Currently used by DynamoDb-backed S3 log store when database operations fail.
Expand Down
Loading

0 comments on commit cbcf6df

Please sign in to comment.