Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate schema and encoding issues when testing kafka connections #460

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,7 @@ async fn get_schema(
"confluent" => {
let c: ConfluentProfile =
serde_json::from_value(profile_config.clone()).expect("invalid confluent config");
c.try_into()
.expect("unable to convert confluent config into kafka config")
c.into()
}
_ => {
return Err(bad_request(
Expand Down Expand Up @@ -640,7 +639,11 @@ async fn get_schema(
resolver.get_schema_for_version(None).await.map_err(|e| {
bad_request(format!(
"failed to fetch schemas from schema repository: {}",
e
e.chain()
.into_iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(": ")
))
})
}
Expand Down
10 changes: 4 additions & 6 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arroyo_formats::json::arrow_to_json_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, ConfluentSchemaType};
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{error_chain, OperatorConfig};
use arroyo_server_common::log_event;
use arroyo_sql::types::StructDef;
use arroyo_sql::{has_duplicate_udf_names, ArroyoSchemaProvider, CompiledSql, SqlConfig};
Expand Down Expand Up @@ -194,8 +194,7 @@ async fn try_register_confluent_schema(

let id = schema_registry
.write_schema(schema.canonical_form(), ConfluentSchemaType::Avro)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;
.await?;

avro.schema_id = Some(id as u32);
config.format = Some(Format::Avro(avro))
Expand All @@ -209,8 +208,7 @@ async fn try_register_confluent_schema(

let id = schema_registry
.write_schema(schema.to_string(), ConfluentSchemaType::Json)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;
.await?;

json.schema_id = Some(id as u32);
config.format = Some(Format::Json(json))
Expand Down Expand Up @@ -346,7 +344,7 @@ pub(crate) async fn create_pipeline<'a>(
message: format!(
"Failed to register schemas with the schema registry. Make sure \
that the schema_registry is configured correctly and running.\nDetails: {}",
e
error_chain(e)
),
})?;

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ arroyo-types = { path = "../arroyo-types" }
arroyo-storage = { path = "../arroyo-storage" }
arroyo-rpc = { path = "../arroyo-rpc" }
arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-formats = { path = "../arroyo-formats" }

chrono = "0.4"
serde = { version = "1", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions arroyo-connectors/src/confluent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Connector for ConfluentConnector {
_: &str,
config: Self::ProfileT,
mut table: Self::TableT,
_: Option<&ConnectionSchema>,
schema: Option<&ConnectionSchema>,
tx: Sender<Result<Event, Infallible>>,
) {
table
Expand All @@ -146,7 +146,7 @@ impl Connector for ConfluentConnector {
connection: config.into(),
};

tester.start(table, tx);
tester.start(table, schema.cloned(), tx);
}

fn test_profile(&self, profile: Self::ProfileT) -> Option<Receiver<TestSourceMessage>> {
Expand Down
155 changes: 128 additions & 27 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use anyhow::{anyhow, bail};
use arroyo_formats::avro::deserialize_slice_avro;
use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::schema_resolver::ConfluentSchemaRegistryClient;
use arroyo_rpc::{var_str::VarStr, OperatorConfig};
use arroyo_rpc::formats::{Format, JsonFormat};
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistryClient, FailingSchemaResolver};
use arroyo_rpc::{schema_resolver, var_str::VarStr, OperatorConfig};
use axum::response::sse::Event;
use futures::TryFutureExt;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
ClientConfig, Offset, TopicPartitionList,
ClientConfig, Message, Offset, TopicPartitionList,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, Mutex};
use tonic::Status;
use tracing::{error, info, warn};
use typify::import_types;
Expand Down Expand Up @@ -262,12 +265,12 @@ impl Connector for KafkaConnector {
_: &str,
config: Self::ProfileT,
table: Self::TableT,
_: Option<&ConnectionSchema>,
schema: Option<&ConnectionSchema>,
tx: Sender<Result<Event, Infallible>>,
) {
let tester = KafkaTester { connection: config };

tester.start(table, tx);
tester.start(table, schema.cloned(), tx);
}

fn table_type(&self, _: Self::ProfileT, table: Self::TableT) -> ConnectionType {
Expand Down Expand Up @@ -431,26 +434,112 @@ impl KafkaTester {
Ok(())
}

pub async fn validate_schema(
&self,
table: &KafkaTable,
format: &Format,
msg: Vec<u8>,
) -> anyhow::Result<()> {
match format {
Format::Json(JsonFormat {
confluent_schema_registry,
..
}) => {
if *confluent_schema_registry {
if msg[0] != 0 {
bail!("Message appears to be encoded as normal JSON, rather than SR-JSON, but the schema registry is enabled. Ensure that the format and schema type are correct.");
}
serde_json::from_slice::<Value>(&msg[5..]).map_err(|e|
anyhow!("Failed to parse message as schema-registry JSON (SR-JSON): {:?}. Ensure that the format and schema type are correct.", e))?;
} else if msg[0] == 0 {
bail!("Message is not valid JSON. It may be encoded as SR-JSON, but the schema registry is not enabled. Ensure that the format and schema type are correct.");
} else {
serde_json::from_slice(&msg).map_err(|e|
anyhow!("Failed to parse message as JSON: {:?}. Ensure that the format and schema type are correct.", e))?;
}
}
Format::Avro(avro) => {
if avro.confluent_schema_registry {
let schema_resolver = match &self.connection.schema_registry_enum {
Some(SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
api_secret,
}) => schema_resolver::ConfluentSchemaRegistry::new(
endpoint,
&table.topic,
api_key.clone(),
api_secret.clone(),
),
_ => {
bail!(
"schema registry is enabled, but no schema registry is configured"
);
}
}
.map_err(|e| anyhow!("Failed to construct schema registry: {:?}", e))?;

if msg[0] != 0 {
bail!("Message appears to be encoded as normal Avro, rather than SR-Avro, but the schema registry is enabled. Ensure that the format and schema type are correct.");
}

let schema_registry = Arc::new(Mutex::new(HashMap::new()));

let _ = deserialize_slice_avro::<Value>(avro, schema_registry, Arc::new(schema_resolver), &msg)
.await
.map_err(|e|
anyhow!("Failed to parse message as schema-registry Avro (SR-Avro): {:?}. Ensure that the format and schema type are correct.", e))?;
} else {
let resolver = Arc::new(FailingSchemaResolver::new());
let registry = Arc::new(Mutex::new(HashMap::new()));

let _ = deserialize_slice_avro::<Value>(avro, registry, resolver, &msg)
.await
.map_err(|e|
if msg[0] == 0 {
anyhow!("Failed to parse message as regular Avro. It may be encoded as SR-Avro, but the schema registry is not enabled. Ensure that the format and schema type are correct.")
} else {
anyhow!("Failed to parse message as Avro: {:?}. Ensure that the format and schema type are correct.", e)
})?;
}
}
Format::Parquet(_) => {
unreachable!()
}
Format::RawString(_) => {
String::from_utf8(msg).map_err(|e|
anyhow!("Failed to parse message as UTF-8: {:?}. Ensure that the format and schema type are correct.", e))?;
}
};

Ok(())
}

async fn test(
&self,
table: KafkaTable,
schema: Option<ConnectionSchema>,
mut tx: Sender<Result<Event, Infallible>>,
) -> Result<(), String> {
let client = self.connect().await?;
) -> anyhow::Result<()> {
let format = schema
.and_then(|s| s.format)
.ok_or_else(|| anyhow!("No format defined for Kafka connection"))?;

let client = self.connect().await.map_err(|e| anyhow!("{}", e))?;

self.info(&mut tx, "Connected to Kafka").await;

let topic = table.topic.clone();

let metadata = client
.fetch_metadata(Some(&topic), Duration::from_secs(10))
.map_err(|e| format!("Failed to fetch metadata: {:?}", e))?;
.map_err(|e| anyhow!("Failed to fetch metadata: {:?}", e))?;

self.info(&mut tx, "Fetched topic metadata").await;

{
let topic_metadata = metadata.topics().get(0).ok_or_else(|| {
format!(
anyhow!(
"Returned metadata was empty; unable to subscribe to topic '{}'",
topic
)
Expand All @@ -461,17 +550,17 @@ impl KafkaTester {
rdkafka::types::RDKafkaRespErr::RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
| rdkafka::types::RDKafkaRespErr::RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
| rdkafka::types::RDKafkaRespErr::RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART => {
return Err(format!(
bail!(
"Topic '{}' does not exist in the configured Kafka cluster",
topic
));
);
}
e => {
error!("Unhandled Kafka error while fetching metadata: {:?}", e);
return Err(format!(
bail!(
"Something went wrong while fetching topic metadata: {:?}",
e
));
);
}
}
}
Expand All @@ -484,7 +573,7 @@ impl KafkaTester {

client
.assign(&TopicPartitionList::from_topic_map(&map).unwrap())
.map_err(|e| format!("Failed to subscribe to topic '{}': {:?}", topic, e))?;
.map_err(|e| anyhow!("Failed to subscribe to topic '{}': {:?}", topic, e))?;
}

if let TableType::Source { .. } = table.type_ {
Expand All @@ -496,12 +585,24 @@ impl KafkaTester {
match client.poll(Duration::ZERO) {
Some(Ok(message)) => {
self.info(&mut tx, "Received message from Kafka").await;
self.test_schema(message)?;
self.validate_schema(
&table,
&format,
message
.detach()
.payload()
.ok_or_else(|| anyhow!("received message with empty payload"))?
.to_vec(),
)
.await?;

self.info(&mut tx, "Successfully validated message schema")
.await;
return Ok(());
}
Some(Err(e)) => {
warn!("Error while reading from kafka in test: {:?}", e);
return Err(format!("Error while reading messages from Kafka: {}", e));
return Err(anyhow!("Error while reading messages from Kafka: {}", e));
}
None => {
// wait
Expand All @@ -510,7 +611,7 @@ impl KafkaTester {
}
}

return Err(format!(
return Err(anyhow!(
"No messages received from Kafka within {} seconds",
timeout.as_secs()
));
Expand All @@ -519,11 +620,6 @@ impl KafkaTester {
Ok(())
}

fn test_schema(&self, _: BorrowedMessage) -> Result<(), String> {
// TODO: test the schema against the message
Ok(())
}

async fn info(&self, tx: &mut Sender<Result<Event, Infallible>>, s: impl Into<String>) {
send(
tx,
Expand Down Expand Up @@ -552,16 +648,21 @@ impl KafkaTester {
}
}

pub fn start(self, table: KafkaTable, mut tx: Sender<Result<Event, Infallible>>) {
pub fn start(
self,
table: KafkaTable,
schema: Option<ConnectionSchema>,
mut tx: Sender<Result<Event, Infallible>>,
) {
tokio::spawn(async move {
info!("Started kafka tester");
if let Err(e) = self.test(table, tx.clone()).await {
if let Err(e) = self.test(table, schema, tx.clone()).await {
send(
&mut tx,
TestSourceMessage {
error: true,
done: true,
message: e,
message: e.to_string(),
},
)
.await;
Expand Down
6 changes: 4 additions & 2 deletions arroyo-console/src/components/StartPipelineModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const StartPipelineModal: React.FC<StartPipelineModalProps> = ({
start,
}) => {
return (
<Modal isOpen={isOpen} onClose={onClose} isCentered>
<Modal isOpen={isOpen} onClose={onClose} isCentered size={startError ? '4xl' : 'xl'}>
<ModalOverlay />
<ModalContent>
<ModalHeader>Start Pipeline</ModalHeader>
Expand All @@ -47,7 +47,9 @@ const StartPipelineModal: React.FC<StartPipelineModalProps> = ({
{startError ? (
<Alert status="error">
<AlertIcon />
<AlertDescription>{startError}</AlertDescription>
<AlertDescription overflowY={'auto'} maxH={400} whiteSpace={'pre-wrap'}>
{startError}
</AlertDescription>
</Alert>
) : null}

Expand Down
5 changes: 4 additions & 1 deletion arroyo-console/src/lib/data_fetching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ const connectionProfileAutocompleteFetcher = () => {
export const useConnectionProfileAutocomplete = (id: string) => {
const { data, error } = useSWR<schemas['ConnectionAutocompleteResp']>(
connectionProfileAutocompleteKey(id),
connectionProfileAutocompleteFetcher()
connectionProfileAutocompleteFetcher(),
{
revalidateOnMount: true,
}
);

return {
Expand Down