diff --git a/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs b/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs index a8de5c3ce1..283380fc6d 100644 --- a/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs +++ b/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs @@ -60,9 +60,7 @@ impl AirbyteSourceInterceptor { fn adapt_spec_response_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream::once(async { let message = get_airbyte_response(in_stream, |m| m.spec.is_some()).await?; - let spec = message - .spec - .ok_or(create_custom_error("unexpected spec response"))?; + let spec = message.spec.unwrap(); let mut resp = SpecResponse::default(); resp.endpoint_spec_schema_json = spec.connection_specification.to_string(); @@ -95,9 +93,7 @@ impl AirbyteSourceInterceptor { ) -> InterceptorStream { Box::pin(stream::once(async { let message = get_airbyte_response(in_stream, |m| m.catalog.is_some()).await?; - let catalog = message - .catalog - .ok_or(create_custom_error("unexpected discover response."))?; + let catalog = message.catalog.unwrap(); let mut resp = DiscoverResponse::default(); for stream in catalog.streams { @@ -162,9 +158,7 @@ impl AirbyteSourceInterceptor { let message = get_airbyte_response(in_stream, |m| m.connection_status.is_some()).await?; - let connection_status = message - .connection_status - .ok_or(create_custom_error("unexpected validate response."))?; + let connection_status = message.connection_status.unwrap(); if connection_status.status != Status::Succeeded { return raise_err(&format!("validation failed {:?}", connection_status)); @@ -280,14 +274,14 @@ impl AirbyteSourceInterceptor { ) -> InterceptorStream { let airbyte_message_stream = Box::pin(stream_airbyte_responses(in_stream)); - // transaction_pending is true if the connector writes output messages and exits _without_ writing - // a final state checkpoint. Box::pin(stream::try_unfold( (false, stream_to_binding, airbyte_message_stream), |(transaction_pending, stb, mut stream)| async move { let message = match stream.next().await { Some(m) => m?, None => { + // transaction_pending is true if the connector writes output messages and exits _without_ writing + // a final state checkpoint. if transaction_pending { // We generate a synthetic commit now, and the empty checkpoint means the assumed behavior // of the next invocation will be "full refresh". @@ -316,26 +310,23 @@ impl AirbyteSourceInterceptor { Ok(Some((encode_message(&resp)?, (false, stb, stream)))) } else if let Some(record) = message.record { let stream_to_binding = stb.lock().await; - match stream_to_binding.get(&record.stream) { - None => { - raise_err(&format!( + let binding = + stream_to_binding + .get(&record.stream) + .ok_or(create_custom_error(&format!( "connector record with unknown stream {}", record.stream - ))?; - } - Some(binding) => { - let arena = record.data.get().as_bytes().to_vec(); - let arena_len: u32 = arena.len() as u32; - resp.documents = Some(Documents { - binding: *binding as u32, - arena: arena, - docs_json: vec![Slice { - begin: 0, - end: arena_len, - }], - }) - } - } + )))?; + let arena = record.data.get().as_bytes().to_vec(); + let arena_len: u32 = arena.len() as u32; + resp.documents = Some(Documents { + binding: *binding as u32, + arena: arena, + docs_json: vec![Slice { + begin: 0, + end: arena_len, + }], + }); drop(stream_to_binding); Ok(Some((encode_message(&resp)?, (true, stb, stream)))) } else {