Skip to content

Commit

Permalink
feat(graph, graphql, store): Support new Timestamp scalar
Browse files Browse the repository at this point in the history
  • Loading branch information
dotansimha authored and lutter committed Mar 21, 2024
1 parent c52c53b commit 52f4929
Show file tree
Hide file tree
Showing 51 changed files with 468 additions and 111 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 43 additions & 4 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub const BYTES_SCALAR: &str = "Bytes";
pub const BIG_INT_SCALAR: &str = "BigInt";
pub const BIG_DECIMAL_SCALAR: &str = "BigDecimal";
pub const INT8_SCALAR: &str = "Int8";
pub const TIMESTAMP_SCALAR: &str = "Timestamp";

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum ValueType {
Expand All @@ -149,6 +150,7 @@ pub enum ValueType {
Int,
Int8,
String,
Timestamp,
}

impl FromStr for ValueType {
Expand All @@ -162,6 +164,7 @@ impl FromStr for ValueType {
"BigDecimal" => Ok(ValueType::BigDecimal),
"Int" => Ok(ValueType::Int),
"Int8" => Ok(ValueType::Int8),
"Timestamp" => Ok(ValueType::Timestamp),
"String" | "ID" => Ok(ValueType::String),
s => Err(anyhow!("Type not available in this context: {}", s)),
}
Expand All @@ -177,7 +180,9 @@ impl ValueType {
pub fn is_numeric(&self) -> bool {
match self {
ValueType::BigInt | ValueType::BigDecimal | ValueType::Int | ValueType::Int8 => true,
ValueType::Boolean | ValueType::Bytes | ValueType::String => false,
ValueType::Boolean | ValueType::Bytes | ValueType::String | ValueType::Timestamp => {
false
}
}
}

Expand All @@ -189,6 +194,7 @@ impl ValueType {
ValueType::BigDecimal => "BigDecimal",
ValueType::Int => "Int",
ValueType::Int8 => "Int8",
ValueType::Timestamp => "Timestamp",
ValueType::String => "String",
}
}
Expand Down Expand Up @@ -221,9 +227,14 @@ impl PartialOrd for ValueType {
| (BigDecimal, Int)
| (BigDecimal, Int8)
| (Int8, Int) => Some(Greater),
(Boolean, _) | (_, Boolean) | (Bytes, _) | (_, Bytes) | (String, _) | (_, String) => {
None
}
(Timestamp, _)
| (_, Timestamp)
| (Boolean, _)
| (_, Boolean)
| (Bytes, _)
| (_, Bytes)
| (String, _)
| (_, String) => None,
}
}
}
Expand All @@ -243,6 +254,7 @@ pub enum Value {
String(String),
Int(i32),
Int8(i64),
Timestamp(scalar::Timestamp),
BigDecimal(scalar::BigDecimal),
Bool(bool),
List(Vec<Value>),
Expand Down Expand Up @@ -298,6 +310,9 @@ impl stable_hash_legacy::StableHash for Value {
BigInt(inner) => {
stable_hash_legacy::StableHash::stable_hash(inner, sequence_number, state)
}
Timestamp(inner) => {
stable_hash_legacy::StableHash::stable_hash(inner, sequence_number, state)
}
}
}
}
Expand Down Expand Up @@ -345,6 +360,10 @@ impl StableHash for Value {
inner.stable_hash(field_address.child(0), state);
8
}
Timestamp(inner) => {
inner.stable_hash(field_address.child(0), state);
9
}
};

state.write(field_address, &[variant])
Expand Down Expand Up @@ -391,6 +410,14 @@ impl Value {
INT8_SCALAR => Value::Int8(s.parse::<i64>().map_err(|_| {
QueryExecutionError::ValueParseError("Int8".to_string(), format!("{}", s))
})?),
TIMESTAMP_SCALAR => {
Value::Timestamp(scalar::Timestamp::parse_timestamp(s).map_err(|_| {
QueryExecutionError::ValueParseError(
"Timestamp".to_string(),
format!("{}", s),
)
})?)
}
_ => Value::String(s.clone()),
}
}
Expand Down Expand Up @@ -491,6 +518,7 @@ impl Value {
Value::Bytes(_) => "Bytes".to_owned(),
Value::Int(_) => "Int".to_owned(),
Value::Int8(_) => "Int8".to_owned(),
Value::Timestamp(_) => "Timestamp".to_owned(),
Value::List(values) => {
if let Some(v) = values.first() {
format!("[{}]", v.type_name())
Expand All @@ -512,6 +540,7 @@ impl Value {
| (Value::Bytes(_), ValueType::Bytes)
| (Value::Int(_), ValueType::Int)
| (Value::Int8(_), ValueType::Int8)
| (Value::Timestamp(_), ValueType::Timestamp)
| (Value::Null, _) => true,
(Value::List(values), _) if is_list => values
.iter()
Expand All @@ -534,6 +563,7 @@ impl fmt::Display for Value {
Value::String(s) => s.to_string(),
Value::Int(i) => i.to_string(),
Value::Int8(i) => i.to_string(),
Value::Timestamp(i) => i.to_string(),
Value::BigDecimal(d) => d.to_string(),
Value::Bool(b) => b.to_string(),
Value::Null => "null".to_string(),
Expand All @@ -552,6 +582,7 @@ impl fmt::Debug for Value {
Self::String(s) => f.debug_tuple("String").field(s).finish(),
Self::Int(i) => f.debug_tuple("Int").field(i).finish(),
Self::Int8(i) => f.debug_tuple("Int8").field(i).finish(),
Self::Timestamp(i) => f.debug_tuple("Timestamp").field(i).finish(),
Self::BigDecimal(d) => d.fmt(f),
Self::Bool(arg0) => f.debug_tuple("Bool").field(arg0).finish(),
Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(),
Expand All @@ -568,6 +599,7 @@ impl From<Value> for q::Value {
Value::String(s) => q::Value::String(s),
Value::Int(i) => q::Value::Int(q::Number::from(i)),
Value::Int8(i) => q::Value::String(i.to_string()),
Value::Timestamp(ts) => q::Value::String(ts.as_microseconds_since_epoch().to_string()),
Value::BigDecimal(d) => q::Value::String(d.to_string()),
Value::Bool(b) => q::Value::Boolean(b),
Value::Null => q::Value::Null,
Expand All @@ -586,6 +618,7 @@ impl From<Value> for r::Value {
Value::String(s) => r::Value::String(s),
Value::Int(i) => r::Value::Int(i as i64),
Value::Int8(i) => r::Value::String(i.to_string()),
Value::Timestamp(i) => r::Value::Timestamp(i),
Value::BigDecimal(d) => r::Value::String(d.to_string()),
Value::Bool(b) => r::Value::Boolean(b),
Value::Null => r::Value::Null,
Expand Down Expand Up @@ -622,6 +655,12 @@ impl From<scalar::Bytes> for Value {
}
}

impl From<scalar::Timestamp> for Value {
fn from(value: scalar::Timestamp) -> Value {
Value::Timestamp(value)
}
}

impl From<bool> for Value {
fn from(value: bool) -> Value {
Value::Bool(value)
Expand Down
85 changes: 82 additions & 3 deletions graph/src/data/store/scalar.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use diesel::deserialize::{FromSql, FromSqlRow};
use diesel::expression::AsExpression;
use diesel::serialize::ToSql;
Expand All @@ -13,6 +14,7 @@ use web3::types::*;

use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Display, Formatter};
use std::num::ParseIntError;
use std::ops::{Add, BitAnd, BitOr, Deref, Div, Mul, Rem, Shl, Shr, Sub};
use std::str::FromStr;

Expand Down Expand Up @@ -677,14 +679,91 @@ impl From<Vec<u8>> for Bytes {
}

impl ToSql<diesel::sql_types::Binary, diesel::pg::Pg> for Bytes {
fn to_sql(
&self,
out: &mut diesel::serialize::Output<diesel::pg::Pg>,
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<_ as ToSql<diesel::sql_types::Binary, _>>::to_sql(self.as_slice(), &mut out.reborrow())
}
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Timestamp(pub DateTime<Utc>);

#[derive(thiserror::Error, Debug)]
pub enum TimestampError {
#[error("Invalid timestamp string: {0}")]
StringParseError(ParseIntError),
#[error("Invalid timestamp format")]
InvalidTimestamp,
}

impl Timestamp {
pub fn parse_timestamp(v: &str) -> Result<Self, TimestampError> {
let as_num: i64 = v.parse().map_err(TimestampError::StringParseError)?;
Timestamp::from_microseconds_since_epoch(as_num)
}

pub fn from_rfc3339(v: &str) -> Result<Self, chrono::ParseError> {
Ok(Timestamp(DateTime::parse_from_rfc3339(v)?.into()))
}

pub fn from_microseconds_since_epoch(micros: i64) -> Result<Self, TimestampError> {
let secs = micros / 1_000_000;
let ns = (micros % 1_000_000) * 1_000;

match DateTime::from_timestamp(secs, ns as u32) {
Some(dt) => Ok(Self(dt)),
None => Err(TimestampError::InvalidTimestamp),
}
}

pub fn as_microseconds_since_epoch(&self) -> i64 {
self.0.timestamp_micros()
}
}

impl StableHash for Timestamp {
fn stable_hash<H: stable_hash::StableHasher>(&self, field_address: H::Addr, state: &mut H) {
self.0.timestamp_micros().stable_hash(field_address, state)
}
}

impl stable_hash_legacy::StableHash for Timestamp {
fn stable_hash<H: stable_hash_legacy::StableHasher>(
&self,
sequence_number: H::Seq,
state: &mut H,
) {
stable_hash_legacy::StableHash::stable_hash(
&self.0.timestamp_micros(),
sequence_number,
state,
)
}
}

impl Display for Timestamp {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", self.as_microseconds_since_epoch())
}
}

impl ToSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<_ as ToSql<diesel::sql_types::Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
}
}

impl GasSizeOf for Timestamp {
fn const_gas_size_of() -> Option<Gas> {
Some(Gas::new(std::mem::size_of::<Timestamp>().saturating_into()))
}
}

#[cfg(test)]
mod test {
use super::{BigDecimal, BigInt, Bytes};
Expand Down
15 changes: 14 additions & 1 deletion graph/src/data/store/sql.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::anyhow;
use diesel::pg::Pg;
use diesel::serialize::{self, Output, ToSql};
use diesel::sql_types::{Binary, Bool, Int8, Integer, Text};
use diesel::sql_types::{Binary, Bool, Int8, Integer, Text, Timestamptz};

use std::str::FromStr;

Expand Down Expand Up @@ -47,6 +47,19 @@ impl ToSql<Int8, Pg> for Value {
}
}

impl ToSql<Timestamptz, Pg> for Value {
fn to_sql(&self, out: &mut Output<Pg>) -> serialize::Result {
match self {
Value::Timestamp(i) => i.to_sql(&mut out.reborrow()),
v => Err(anyhow!(
"Failed to convert non-timestamp attribute value to timestamp in SQL: {}",
v
)
.into()),
}
}
}

impl ToSql<Text, Pg> for Value {
fn to_sql(&self, out: &mut Output<Pg>) -> serialize::Result {
match self {
Expand Down
23 changes: 22 additions & 1 deletion graph/src/data/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::iter::FromIterator;

use super::store::scalar;

/// An immutable string that is more memory-efficient since it only has an
/// overhead of 16 bytes for storing a string vs the 24 bytes that `String`
/// requires
Expand Down Expand Up @@ -305,6 +307,7 @@ pub enum Value {
Enum(String),
List(Vec<Value>),
Object(Object),
Timestamp(scalar::Timestamp),
}

impl Value {
Expand Down Expand Up @@ -348,6 +351,10 @@ impl Value {
}
("Int8", Value::Int(num)) => Ok(Value::String(num.to_string())),
("Int8", Value::String(num)) => Ok(Value::String(num)),
("Timestamp", Value::Timestamp(ts)) => Ok(Value::Timestamp(ts)),
("Timestamp", Value::String(ts_str)) => Ok(Value::Timestamp(
scalar::Timestamp::parse_timestamp(&ts_str).map_err(|_| Value::String(ts_str))?,
)),
("String", Value::String(s)) => Ok(Value::String(s)),
("ID", Value::String(s)) => Ok(Value::String(s)),
("ID", Value::Int(n)) => Ok(Value::String(n.to_string())),
Expand Down Expand Up @@ -394,14 +401,21 @@ impl std::fmt::Display for Value {
}
write!(f, "}}")
}
Value::Timestamp(ref ts) => {
write!(f, "\"{}\"", ts.as_microseconds_since_epoch().to_string())
}
}
}
}

impl CacheWeight for Value {
fn indirect_weight(&self) -> usize {
match self {
Value::Boolean(_) | Value::Int(_) | Value::Null | Value::Float(_) => 0,
Value::Boolean(_)
| Value::Int(_)
| Value::Null
| Value::Float(_)
| Value::Timestamp(_) => 0,
Value::Enum(s) | Value::String(s) => s.indirect_weight(),
Value::List(l) => l.indirect_weight(),
Value::Object(o) => o.indirect_weight(),
Expand All @@ -426,6 +440,9 @@ impl Serialize for Value {
}
seq.end()
}
Value::Timestamp(ts) => {
serializer.serialize_str(&ts.as_microseconds_since_epoch().to_string().as_str())
}
Value::Null => serializer.serialize_none(),
Value::String(s) => serializer.serialize_str(s),
Value::Object(o) => {
Expand Down Expand Up @@ -519,6 +536,7 @@ impl From<Value> for q::Value {
}
q::Value::Object(rmap)
}
Value::Timestamp(ts) => q::Value::String(ts.as_microseconds_since_epoch().to_string()),
}
}
}
Expand All @@ -534,6 +552,9 @@ impl std::fmt::Debug for Value {
Value::Enum(e) => write!(f, "{e}"),
Value::List(l) => f.debug_list().entries(l).finish(),
Value::Object(o) => write!(f, "{o:?}"),
Value::Timestamp(ts) => {
write!(f, "{:?}", ts.as_microseconds_since_epoch().to_string())
}
}
}
}
Loading

0 comments on commit 52f4929

Please sign in to comment.