Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 19, 2023
1 parent 54e1937 commit 0e7e658
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 14 deletions.
10 changes: 4 additions & 6 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arroyo_formats::json::arrow_to_json_schema;
use arroyo_rpc::formats::Format;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::schema_resolver::{ConfluentSchemaRegistry, ConfluentSchemaType};
use arroyo_rpc::OperatorConfig;
use arroyo_rpc::{error_chain, OperatorConfig};
use arroyo_server_common::log_event;
use arroyo_sql::types::StructDef;
use arroyo_sql::{has_duplicate_udf_names, ArroyoSchemaProvider, CompiledSql, SqlConfig};
Expand Down Expand Up @@ -194,8 +194,7 @@ async fn try_register_confluent_schema(

let id = schema_registry
.write_schema(schema.canonical_form(), ConfluentSchemaType::Avro)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;
.await?;

avro.schema_id = Some(id as u32);
config.format = Some(Format::Avro(avro))
Expand All @@ -209,8 +208,7 @@ async fn try_register_confluent_schema(

let id = schema_registry
.write_schema(schema.to_string(), ConfluentSchemaType::Json)
.await
.map_err(|e| anyhow!("Failed to write schema to schema registry: {}", e))?;
.await?;

json.schema_id = Some(id as u32);
config.format = Some(Format::Json(json))
Expand Down Expand Up @@ -346,7 +344,7 @@ pub(crate) async fn create_pipeline<'a>(
message: format!(
"Failed to register schemas with the schema registry. Make sure \
that the schema_registry is configured correctly and running.\nDetails: {}",
e
error_chain(e)
),
})?;

Expand Down
2 changes: 1 addition & 1 deletion arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl KafkaTester {
.await
.map_err(|e|
if msg[0] == 0 {
anyhow!("Failed to parse message are regular Avro. It may be encoded as SR-Avro, but the schema registry is not enabled. Ensure that the format and schema type are correct.")
anyhow!("Failed to parse message as regular Avro. It may be encoded as SR-Avro, but the schema registry is not enabled. Ensure that the format and schema type are correct.")
} else {
anyhow!("Failed to parse message as Avro: {:?}. Ensure that the format and schema type are correct.", e)
})?;
Expand Down
6 changes: 4 additions & 2 deletions arroyo-console/src/components/StartPipelineModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const StartPipelineModal: React.FC<StartPipelineModalProps> = ({
start,
}) => {
return (
<Modal isOpen={isOpen} onClose={onClose} isCentered>
<Modal isOpen={isOpen} onClose={onClose} isCentered size={startError ? '4xl' : 'xl'}>
<ModalOverlay />
<ModalContent>
<ModalHeader>Start Pipeline</ModalHeader>
Expand All @@ -47,7 +47,9 @@ const StartPipelineModal: React.FC<StartPipelineModalProps> = ({
{startError ? (
<Alert status="error">
<AlertIcon />
<AlertDescription>{startError}</AlertDescription>
<AlertDescription overflowY={"auto"} maxH={400} whiteSpace={'pre-wrap'}>{
startError
}</AlertDescription>
</Alert>
) : null}

Expand Down
7 changes: 5 additions & 2 deletions arroyo-console/src/lib/data_fetching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ const pingFetcher = async () => {

export const usePing = () => {
const { data, error, isLoading } = useSWR('ping', pingFetcher, {
refreshInterval: 100000000,
refreshInterval: 1000,
onErrorRetry: (error, key, config, revalidate, {}) => {
// explicitly define this function to override the exponential backoff
setTimeout(() => revalidate(), 1000);
Expand Down Expand Up @@ -251,7 +251,10 @@ const connectionProfileAutocompleteFetcher = () => {
export const useConnectionProfileAutocomplete = (id: string) => {
const { data, error } = useSWR<schemas['ConnectionAutocompleteResp']>(
connectionProfileAutocompleteKey(id),
connectionProfileAutocompleteFetcher()
connectionProfileAutocompleteFetcher(),
{
revalidateOnMount: true,
}
);

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function ConfluentSchemaEditor({
}) {
return (
<Stack spacing={4} maxW="lg">
<Text>Schemas will be loaded from the configured Confluent Schema Registry</Text>
<Text>Schemas will be loaded from and written to the configured Confluent Schema Registry</Text>

<Button colorScheme="green" onClick={() => next()}>
Continue
Expand Down
8 changes: 8 additions & 0 deletions arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,11 @@ impl Default for OperatorConfig {
}
}
}

pub fn error_chain(e: anyhow::Error) -> String {
e.chain()
.into_iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(": ")
}
8 changes: 6 additions & 2 deletions arroyo-rpc/src/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,11 @@ impl ConfluentSchemaRegistryClient {

match status {
StatusCode::CONFLICT => {
bail!("{}", body)
bail!(
"there is already an existing schema for this topic which is \
incompatible with the new schema being registered:\n\n{}",
body
)
}
StatusCode::UNPROCESSABLE_ENTITY => {
bail!("invalid schema: {}", body);
Expand Down Expand Up @@ -333,7 +337,7 @@ impl ConfluentSchemaRegistry {
self.client
.write_schema(self.topic_endpoint(), schema, schema_type)
.await
.context(format!("failed to write schema for topic '{}'", self.topic))
.context(format!("topic '{}'", self.topic))
}

pub async fn get_schema_for_id(
Expand Down

0 comments on commit 0e7e658

Please sign in to comment.