Skip to content

Commit

Permalink
Add control for deserialization error behavior
Browse files Browse the repository at this point in the history
Adds a config option 'bad_data' to control whether deserialization
errors result in dropping the data or failing the job. The logic is
mostly contained in `source_collector.rs`, and it uses a new
`RateLimiter` struct to limit the logging frequency and user error
reporting.
  • Loading branch information
jbeisen committed Dec 15, 2023
1 parent 930346b commit 152090c
Show file tree
Hide file tree
Showing 35 changed files with 331 additions and 170 deletions.
1 change: 1 addition & 0 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 {
UdfPost,
GlobalUdf,
GlobalUdfCollection,
BadData,
)),
tags(
(name = "ping", description = "Ping endpoint"),
Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl Connector for BlackholeConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
bad_data: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Connector for DeltaLakeConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl Connector for FileSystemConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/fluvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl Connector for FluvioConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 2 additions & 0 deletions arroyo-connectors/src/impulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub fn impulse_schema() -> ConnectionSchema {
ConnectionSchema {
format: None,
framing: None,
bad_data: None,
struct_name: Some("arroyo_types::ImpulseEvent".to_string()),
fields: vec![
source_field("counter", Primitive(PrimitiveType::UInt64)),
Expand Down Expand Up @@ -161,6 +162,7 @@ impl Connector for ImpulseConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
bad_data: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl Connector for KafkaConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl Connector for KinesisConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 2 additions & 0 deletions arroyo-connectors/src/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn nexmark_schema() -> ConnectionSchema {
use arroyo_rpc::api_types::connections::PrimitiveType::*;
ConnectionSchema {
format: None,
bad_data: None,
framing: None,
struct_name: Some("arroyo_types::nexmark::Event".to_string()),
fields: vec![
Expand Down Expand Up @@ -207,6 +208,7 @@ impl Connector for NexmarkConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
bad_data: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl Connector for PollingHTTPConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl Connector for RedisConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/single_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Connector for SingleFileConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Connector for SSEConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl Connector for WebhookConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl Connector for WebsocketConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
17 changes: 7 additions & 10 deletions arroyo-formats/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use apache_avro::{from_avro_datum, Reader, Schema, Writer};
use arrow::datatypes::{DataType, Field, Fields, TimeUnit};
use arroyo_rpc::formats::AvroFormat;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_types::UserError;
use arroyo_types::SourceError;
use serde::de::DeserializeOwned;
use serde_json::{json, Value as JsonValue};
use std::collections::HashMap;
Expand All @@ -17,7 +17,7 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
schema_registry: Arc<Mutex<HashMap<u32, Schema>>>,
resolver: Arc<dyn SchemaResolver + Sync>,
mut msg: &'a [u8],
) -> Result<impl Iterator<Item = Result<T, UserError>> + 'a, String> {
) -> Result<impl Iterator<Item = Result<T, SourceError>> + 'a, String> {
let id = if format.confluent_schema_registry {
let magic_byte = msg[0];
if magic_byte != 0 {
Expand Down Expand Up @@ -72,10 +72,7 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
let into_json = format.into_unstructured_json;
Ok(messages.into_iter().map(move |record| {
let value = record.map_err(|e| {
UserError::new(
"Deserialization failed",
format!("Failed to deserialize from avro: {:?}", e),
)
SourceError::bad_data(format!("Failed to deserialize from avro: {:?}", e))
})?;

if into_json {
Expand All @@ -84,10 +81,10 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
// for now round-trip through json in order to handle unsupported avro features
// as that allows us to rely on raw json deserialization
serde_json::from_value(avro_to_json(value)).map_err(|e| {
UserError::new(
"Deserialization failed",
format!("Failed to convert avro message into struct type: {:?}", e),
)
SourceError::bad_data(format!(
"Failed to convert avro message into struct type: {:?}",
e
))
})
}
}))
Expand Down
23 changes: 11 additions & 12 deletions arroyo-formats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_array::cast::AsArray;
use arrow_array::{RecordBatch, StringArray};
use arroyo_rpc::formats::{AvroFormat, Format, Framing, FramingMethod};
use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, SchemaResolver};
use arroyo_types::{Data, Debezium, RawJson, UserError};
use arroyo_types::{Data, Debezium, RawJson, SourceError};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::{json, Value};
Expand Down Expand Up @@ -290,7 +290,7 @@ impl<T: SchemaData> DataDeserializer<T> {
pub async fn deserialize_slice<'a>(
&mut self,
msg: &'a [u8],
) -> impl Iterator<Item = Result<T, UserError>> + 'a + Send {
) -> impl Iterator<Item = Result<T, SourceError>> + 'a + Send {
match &*self.format {
Format::Avro(avro) => {
let schema_registry = self.schema_registry.clone();
Expand All @@ -300,17 +300,21 @@ impl<T: SchemaData> DataDeserializer<T> {
{
Ok(iter) => Box::new(iter),
Err(e) => Box::new(
vec![Err(UserError::new("Avro deserialization failed", e))].into_iter(),
vec![Err(SourceError::other(
"Avro error",
format!("Avro deserialization failed: {}", e),
))]
.into_iter(),
)
as Box<dyn Iterator<Item = Result<T, UserError>> + Send>,
as Box<dyn Iterator<Item = Result<T, SourceError>> + Send>,
}
}
_ => {
let new_self = self.clone();
Box::new(
FramingIterator::new(self.framing.clone(), msg)
.map(move |t| new_self.deserialize_single(t)),
) as Box<dyn Iterator<Item = Result<T, UserError>> + Send>
) as Box<dyn Iterator<Item = Result<T, SourceError>> + Send>
}
}
}
Expand All @@ -319,19 +323,14 @@ impl<T: SchemaData> DataDeserializer<T> {
self.format.clone()
}

pub fn deserialize_single(&self, msg: &[u8]) -> Result<T, UserError> {
pub fn deserialize_single(&self, msg: &[u8]) -> Result<T, SourceError> {
match &*self.format {
Format::Json(json) => json::deserialize_slice_json(json, msg),
Format::Avro(_) => unreachable!("avro should be handled by here"),
Format::Parquet(_) => todo!("parquet is not supported as an input format"),
Format::RawString(_) => deserialize_raw_string(msg),
}
.map_err(|e| {
UserError::new(
"Deserialization failed",
format!("Failed to deserialize: {}", e),
)
})
.map_err(|e| SourceError::bad_data(format!("Failed to deserialize: {:?}", e)))
}
}

Expand Down
5 changes: 4 additions & 1 deletion arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::formats::{Format, Framing};
use crate::formats::{BadData, Format, Framing};
use anyhow::bail;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -128,6 +128,7 @@ pub enum SchemaDefinition {
#[serde(rename_all = "camelCase")]
pub struct ConnectionSchema {
pub format: Option<Format>,
pub bad_data: Option<BadData>,
pub framing: Option<Framing>,
pub struct_name: Option<String>,
pub fields: Vec<SourceField>,
Expand All @@ -138,6 +139,7 @@ pub struct ConnectionSchema {
impl ConnectionSchema {
pub fn try_new(
format: Option<Format>,
bad_data: Option<BadData>,
framing: Option<Framing>,
struct_name: Option<String>,
fields: Vec<SourceField>,
Expand All @@ -146,6 +148,7 @@ impl ConnectionSchema {
) -> anyhow::Result<Self> {
let s = ConnectionSchema {
format,
bad_data,
framing,
struct_name,
fields,
Expand Down
23 changes: 23 additions & 0 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,29 @@ impl Format {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum BadData {
Fail {},
Drop {},
}

impl BadData {
pub fn from_opts(opts: &mut HashMap<String, String>) -> Result<Option<Self>, String> {
let Some(method) = opts.remove("bad_data") else {
return Ok(None);
};

let method = match method.as_str() {
"drop" => BadData::Drop {},
"fail" => BadData::Fail {},
f => return Err(format!("Unknown invalid data behavior '{}'", f)),
};

Ok(Some(method))
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct Framing {
Expand Down
4 changes: 3 additions & 1 deletion arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::{fs, time::SystemTime};

use crate::api_types::connections::PrimitiveType;
use crate::formats::{Format, Framing};
use crate::formats::{BadData, Format, Framing};
use crate::grpc::{LoadCompactedDataReq, SubtaskCheckpointMetadata};
use arroyo_types::CheckpointBarrier;
use grpc::{StopMode, TaskCheckpointEventType};
Expand Down Expand Up @@ -165,6 +165,7 @@ pub struct OperatorConfig {
pub connection: Value,
pub table: Value,
pub format: Option<Format>,
pub bad_data: Option<BadData>,
pub framing: Option<Framing>,
pub rate_limit: Option<RateLimit>,
}
Expand All @@ -175,6 +176,7 @@ impl Default for OperatorConfig {
connection: serde_json::from_str("{}").unwrap(),
table: serde_json::from_str("{}").unwrap(),
format: None,
bad_data: None,
framing: None,
rate_limit: None,
}
Expand Down
2 changes: 2 additions & 0 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn get_json_schema_source() -> Result<Connection> {
})),
None,
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
Expand Down Expand Up @@ -187,6 +188,7 @@ pub fn get_avro_source() -> Result<Connection> {
Some(Format::Avro(format)),
None,
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
Expand Down
1 change: 1 addition & 0 deletions arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ pub fn get_test_expression(
let struct_def = test_struct_def();
let schema = ConnectionSchema {
format: Some(Format::Json(JsonFormat::default())),
bad_data: None,
framing: None,
struct_name: struct_def.name.clone(),
fields: struct_def
Expand Down
6 changes: 5 additions & 1 deletion arroyo-sql/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arroyo_datastream::{ConnectorOp, Operator};
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField,
};
use arroyo_rpc::formats::{Format, Framing};
use arroyo_rpc::formats::{BadData, Format, Framing};
use datafusion::sql::sqlparser::ast::Query;
use datafusion::{
optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext},
Expand Down Expand Up @@ -190,6 +190,9 @@ impl ConnectorTable {

let format = Format::from_opts(options).map_err(|e| anyhow!("invalid format: '{e}'"))?;

let bad_data =
BadData::from_opts(options).map_err(|e| anyhow!("Invalid bad_data: '{e}'"))?;

let framing = Framing::from_opts(options).map_err(|e| anyhow!("invalid framing: '{e}'"))?;

let schema_fields: Result<Vec<SourceField>> = fields
Expand All @@ -209,6 +212,7 @@ impl ConnectorTable {

let schema = ConnectionSchema::try_new(
format,
bad_data,
framing,
None,
schema_fields?,
Expand Down

0 comments on commit 152090c

Please sign in to comment.