Skip to content

Commit 96cbc9d

Browse files
fix: Drop single events if corrupted instead of failing whole import
1 parent 7d55fca commit 96cbc9d

File tree

11 files changed

+354
-122
lines changed

11 files changed

+354
-122
lines changed

Cargo.lock

Lines changed: 89 additions & 85 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aw-datastore/src/datastore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ impl DatastoreInstance {
373373
self.buckets_cache.insert(bucket.id.clone(), bucket.clone());
374374
// Insert events
375375
if let Some(events) = events {
376-
self.insert_events(conn, &bucket.id, events)?;
376+
self.insert_events(conn, &bucket.id, events.take_inner())?;
377377
bucket.events = None;
378378
}
379379
Ok(())

aw-datastore/src/legacy_import.rs

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -103,30 +103,7 @@ mod import {
103103
let timestamp_str: String = row.get(0)?;
104104
let duration_float: f64 = row.get(1)?;
105105
let data_str: String = row.get(2)?;
106-
107-
let timestamp_str = timestamp_str.replace(" ", "T");
108-
let timestamp = match DateTime::parse_from_rfc3339(&timestamp_str) {
109-
Ok(timestamp) => timestamp.with_timezone(&Utc),
110-
Err(err) => panic!("Timestamp string {}: {:?}", timestamp_str, err),
111-
};
112-
113-
let duration_ns = (duration_float * 1_000_000_000.0) as i64;
114-
115-
let data: serde_json::map::Map<String, serde_json::Value> =
116-
match serde_json::from_str(&data_str) {
117-
Ok(data) => data,
118-
Err(err) => panic!(
119-
"Unable to parse JSON data in event from bucket {}\n{}\n{}",
120-
bucket_id, err, data_str
121-
),
122-
};
123-
124-
Ok(Event {
125-
id: None,
126-
timestamp,
127-
duration: Duration::nanoseconds(duration_ns),
128-
data,
129-
})
106+
Ok((timestamp_str, duration_float, data_str))
130107
}) {
131108
Ok(rows) => rows,
132109
Err(err) => {
@@ -139,7 +116,35 @@ mod import {
139116
let mut list = Vec::new();
140117
for row in rows {
141118
match row {
142-
Ok(event) => list.push(event),
119+
Ok((timestamp_str, duration_float, data_str)) => {
120+
let timestamp_str = timestamp_str.replace(" ", "T");
121+
let timestamp = match DateTime::parse_from_rfc3339(&timestamp_str) {
122+
Ok(timestamp) => timestamp.with_timezone(&Utc),
123+
Err(err) => panic!("Timestamp string {}: {:?}", timestamp_str, err),
124+
};
125+
126+
let duration_ns = (duration_float * 1_000_000_000.0) as i64;
127+
128+
let data: serde_json::map::Map<String, serde_json::Value> =
129+
match serde_json::from_str(&data_str) {
130+
Ok(data) => data,
131+
Err(err) => {
132+
warn!(
133+
"Unable to parse JSON data in event from bucket {}\n{}\n{}",
134+
bucket_id, err, data_str
135+
);
136+
continue;
137+
}
138+
};
139+
140+
let event = Event {
141+
id: None,
142+
timestamp,
143+
duration: Duration::nanoseconds(duration_ns),
144+
data,
145+
};
146+
list.push(event)
147+
}
143148
Err(err) => panic!("Corrupt event in bucket {}: {}", bucket_id, err),
144149
};
145150
}

aw-models/examples/schema.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
extern crate aw_models;
2+
3+
use schemars::schema_for;
4+
5+
fn main() {
6+
let schema = schema_for!(aw_models::Bucket);
7+
println!("{}", serde_json::to_string_pretty(&schema).unwrap());
8+
}

aw-models/src/bucket.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use serde_json::value::Value;
77
use std::collections::HashMap;
88

99
use crate::Event;
10+
use crate::TryVec;
1011

1112
#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
1213
pub struct Bucket {
@@ -22,7 +23,10 @@ pub struct Bucket {
2223
pub data: Map<String, Value>,
2324
#[serde(default, skip_deserializing)]
2425
pub metadata: BucketMetadata,
25-
pub events: Option<Vec<Event>>, /* Should only be set during import/export */
26+
// Events should only be "Some" during import/export
27+
// It's using a TryVec to discard only the events which were failed to be serialized so only a
28+
// few events are being dropped during import instead of failing the whole import
29+
pub events: Option<TryVec<Event>>,
2630
pub last_updated: Option<DateTime<Utc>>, // TODO: Should probably be moved into metadata field
2731
}
2832

aw-models/src/export.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#[derive(Serialize, Deserialize, JsonSchema, Clone)]
2+
pub struct BucketsExport {
3+
pub buckets: HashMap<String, Bucket>,
4+
}

aw-models/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod info;
2424
mod key_value;
2525
mod query;
2626
mod timeinterval;
27+
mod tryvec;
2728

2829
pub use self::bucket::Bucket;
2930
pub use self::bucket::BucketMetadata;
@@ -34,3 +35,4 @@ pub use self::key_value::Key;
3435
pub use self::key_value::KeyValue;
3536
pub use self::query::Query;
3637
pub use self::timeinterval::TimeInterval;
38+
pub use self::tryvec::TryVec;

aw-models/src/tryvec.rs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
use schemars::JsonSchema;
2+
use serde::de::{DeserializeOwned, SeqAccess, Visitor};
3+
use serde::export::PhantomData;
4+
use serde::ser::SerializeSeq;
5+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
6+
use serde_json::Value;
7+
use std::fmt;
8+
use std::fmt::Debug;
9+
10+
#[derive(Debug, Clone, JsonSchema)]
11+
#[serde(untagged)]
12+
// TODO: JsonSchema is invalid, we should only allow "Parsed" value as the
13+
// others will be dropped
14+
pub enum TryParse<T> {
15+
Parsed(T),
16+
Unparsed(Value),
17+
NotPresent,
18+
}
19+
20+
impl<'de, T: DeserializeOwned> Deserialize<'de> for TryParse<T> {
21+
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
22+
match Option::<Value>::deserialize(deserializer)? {
23+
None => Ok(TryParse::NotPresent),
24+
Some(value) => match T::deserialize(&value) {
25+
Ok(t) => Ok(TryParse::Parsed(t)),
26+
Err(_) => Ok(TryParse::Unparsed(value)),
27+
},
28+
}
29+
}
30+
}
31+
32+
#[derive(Debug, Clone, JsonSchema)]
33+
#[serde(transparent)]
34+
pub struct TryVec<T> {
35+
inner: Vec<TryParse<T>>,
36+
}
37+
38+
impl<T> TryVec<T> {
39+
pub fn new(mut vec: Vec<T>) -> Self {
40+
let mut vec_marked: Vec<TryParse<T>> = Vec::new();
41+
for item in vec.drain(..) {
42+
vec_marked.push(TryParse::Parsed(item));
43+
}
44+
TryVec { inner: vec_marked }
45+
}
46+
47+
pub fn new_empty() -> Self {
48+
TryVec { inner: Vec::new() }
49+
}
50+
51+
pub fn take_inner(self) -> Vec<T> {
52+
let mut vec: Vec<T> = Vec::new();
53+
for item in self.inner {
54+
match item {
55+
TryParse::Parsed(i) => vec.push(i),
56+
_ => continue,
57+
};
58+
}
59+
return vec;
60+
}
61+
}
62+
63+
impl<T> Serialize for TryVec<T>
64+
where
65+
T: Serialize,
66+
{
67+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
68+
where
69+
S: Serializer,
70+
{
71+
let mut seq = serializer.serialize_seq(Some(self.inner.len()))?;
72+
for element in &self.inner {
73+
match element {
74+
TryParse::Parsed(t) => seq.serialize_element(t)?,
75+
_ => continue,
76+
};
77+
}
78+
seq.end()
79+
}
80+
}
81+
82+
struct TryVecVisitor<T> {
83+
marker: PhantomData<fn() -> TryVec<T>>,
84+
}
85+
86+
impl<T> TryVecVisitor<T> {
87+
fn new() -> Self {
88+
TryVecVisitor {
89+
marker: PhantomData,
90+
}
91+
}
92+
}
93+
94+
impl<'de, T> Visitor<'de> for TryVecVisitor<T>
95+
where
96+
T: DeserializeOwned,
97+
{
98+
type Value = TryVec<T>;
99+
100+
// Format a message stating what data this Visitor expects to receive.
101+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
102+
formatter.write_str("a seqence")
103+
}
104+
105+
fn visit_seq<M>(self, mut access: M) -> Result<Self::Value, M::Error>
106+
where
107+
M: SeqAccess<'de>,
108+
{
109+
let mut vec = Vec::new();
110+
111+
loop {
112+
let res = match access.next_element() {
113+
Ok(val) => val,
114+
Err(err) => {
115+
println!(
116+
"Failed to parse event because '{}', the event will be discarded",
117+
err
118+
);
119+
continue;
120+
}
121+
};
122+
match res {
123+
Some(item) => vec.push(item),
124+
None => break,
125+
};
126+
}
127+
128+
Ok(TryVec { inner: vec })
129+
}
130+
}
131+
132+
impl<'de, T> Deserialize<'de> for TryVec<T>
133+
where
134+
T: DeserializeOwned,
135+
{
136+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
137+
where
138+
D: Deserializer<'de>,
139+
{
140+
deserializer.deserialize_seq(TryVecVisitor::new())
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod test {
146+
use serde::{Deserialize, Serialize};
147+
148+
use super::TryVec;
149+
150+
#[derive(Deserialize, Serialize, Debug)]
151+
struct TestEvent {
152+
data: String,
153+
}
154+
155+
fn assert_serialized_deserialized_eq(data: &str, eq: &str) {
156+
let deserialized = serde_json::from_str::<TryVec<TestEvent>>(data).unwrap();
157+
let serialized = serde_json::to_string(&deserialized).unwrap();
158+
assert_eq!(serialized, eq);
159+
}
160+
161+
#[test]
162+
fn test_serialize_deserialize() {
163+
println!("test empty array");
164+
assert_serialized_deserialized_eq(r#"[]"#, r#"[]"#);
165+
166+
println!("test one valid event");
167+
assert_serialized_deserialized_eq(r#"[{"data":"test"}]"#, r#"[{"data":"test"}]"#);
168+
169+
println!("test invalid type int, skip event");
170+
assert_serialized_deserialized_eq(r#"[{ "data": 1 }]"#, r#"[]"#);
171+
172+
println!("test invalid type dict, skip event");
173+
assert_serialized_deserialized_eq(r#"[{"data":{}}]"#, r#"[]"#);
174+
175+
println!("test invalid type arr, skip event");
176+
assert_serialized_deserialized_eq(r#"[{"data":[]}]"#, r#"[]"#);
177+
178+
println!("test multiple valid events");
179+
assert_serialized_deserialized_eq(
180+
r#"[{"data":"test"},{"data":"test2"},{"data":"test3"}]"#,
181+
r#"[{"data":"test"},{"data":"test2"},{"data":"test3"}]"#,
182+
);
183+
184+
println!("test invalid event in middle of sequence, skip one event");
185+
assert_serialized_deserialized_eq(
186+
r#"[{"data":"test"},{"data":2},{"data":"test3"}]"#,
187+
r#"[{"data":"test"},{"data":"test3"}]"#,
188+
);
189+
190+
println!("test utf-16 character");
191+
assert_serialized_deserialized_eq(r#"[{"data":"\ud835\udc47"}]"#, r#"[{"data":"𝑇"}]"#);
192+
193+
println!("test invalid utf-8/16, skip event");
194+
assert_serialized_deserialized_eq(r#"[{"data":"\ud835"}]"#, r#"[]"#);
195+
}
196+
197+
#[test]
198+
fn test_methods() {
199+
let tryvec = TryVec::<TestEvent>::new_empty();
200+
assert_eq!(tryvec.take_inner().len(), Vec::<TestEvent>::new().len());
201+
}
202+
}

aw-server/src/endpoints/bucket.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use chrono::Utc;
99
use aw_models::Bucket;
1010
use aw_models::BucketsExport;
1111
use aw_models::Event;
12+
use aw_models::TryVec;
1213

1314
use rocket::http::Header;
1415
use rocket::http::Status;
@@ -174,11 +175,11 @@ pub fn bucket_export(
174175
Ok(bucket) => bucket,
175176
Err(err) => return Err(err.into()),
176177
};
177-
bucket.events = Some(
178-
datastore
179-
.get_events(&bucket_id, None, None, None)
180-
.expect("Failed to get events for bucket"),
181-
);
178+
/* TODO: Replace expect with http error */
179+
let events = datastore
180+
.get_events(&bucket_id, None, None, None)
181+
.expect("Failed to get events for bucket");
182+
bucket.events = Some(TryVec::new(events));
182183
export.buckets.insert(bucket_id.clone(), bucket);
183184
let filename = format!("aw-bucket-export_{}.json", bucket_id);
184185

aw-server/src/endpoints/export.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use rocket::response::Response;
77
use rocket::State;
88

99
use aw_models::BucketsExport;
10+
use aw_models::TryVec;
1011

1112
use crate::endpoints::{HttpErrorJson, ServerState};
1213

@@ -21,10 +22,11 @@ pub fn buckets_export(state: State<ServerState>) -> Result<Response, HttpErrorJs
2122
Err(err) => return Err(err.into()),
2223
};
2324
for (bid, mut bucket) in buckets.drain() {
24-
bucket.events = Some(match datastore.get_events(&bid, None, None, None) {
25+
let events = match datastore.get_events(&bid, None, None, None) {
2526
Ok(events) => events,
2627
Err(err) => return Err(err.into()),
27-
});
28+
};
29+
bucket.events = Some(TryVec::new(events));
2830
export.buckets.insert(bid, bucket);
2931
}
3032

0 commit comments

Comments
 (0)