Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Apr 12, 2022
1 parent 1c9bbba commit 97574d1
Showing 1 changed file with 20 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ impl AirbyteSourceInterceptor {
fn adapt_spec_response_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream {
Box::pin(stream::once(async {
let message = get_airbyte_response(in_stream, |m| m.spec.is_some()).await?;
let spec = message
.spec
.ok_or(create_custom_error("unexpected spec response"))?;
let spec = message.spec.unwrap();

let mut resp = SpecResponse::default();
resp.endpoint_spec_schema_json = spec.connection_specification.to_string();
Expand Down Expand Up @@ -95,9 +93,7 @@ impl AirbyteSourceInterceptor {
) -> InterceptorStream {
Box::pin(stream::once(async {
let message = get_airbyte_response(in_stream, |m| m.catalog.is_some()).await?;
let catalog = message
.catalog
.ok_or(create_custom_error("unexpected discover response."))?;
let catalog = message.catalog.unwrap();

let mut resp = DiscoverResponse::default();
for stream in catalog.streams {
Expand Down Expand Up @@ -162,9 +158,7 @@ impl AirbyteSourceInterceptor {
let message =
get_airbyte_response(in_stream, |m| m.connection_status.is_some()).await?;

let connection_status = message
.connection_status
.ok_or(create_custom_error("unexpected validate response."))?;
let connection_status = message.connection_status.unwrap();

if connection_status.status != Status::Succeeded {
return raise_err(&format!("validation failed {:?}", connection_status));
Expand Down Expand Up @@ -280,14 +274,14 @@ impl AirbyteSourceInterceptor {
) -> InterceptorStream {
let airbyte_message_stream = Box::pin(stream_airbyte_responses(in_stream));

// transaction_pending is true if the connector writes output messages and exits _without_ writing
// a final state checkpoint.
Box::pin(stream::try_unfold(
(false, stream_to_binding, airbyte_message_stream),
|(transaction_pending, stb, mut stream)| async move {
let message = match stream.next().await {
Some(m) => m?,
None => {
// transaction_pending is true if the connector writes output messages and exits _without_ writing
// a final state checkpoint.
if transaction_pending {
// We generate a synthetic commit now, and the empty checkpoint means the assumed behavior
// of the next invocation will be "full refresh".
Expand Down Expand Up @@ -316,26 +310,23 @@ impl AirbyteSourceInterceptor {
Ok(Some((encode_message(&resp)?, (false, stb, stream))))
} else if let Some(record) = message.record {
let stream_to_binding = stb.lock().await;
match stream_to_binding.get(&record.stream) {
None => {
raise_err(&format!(
let binding =
stream_to_binding
.get(&record.stream)
.ok_or(create_custom_error(&format!(
"connector record with unknown stream {}",
record.stream
))?;
}
Some(binding) => {
let arena = record.data.get().as_bytes().to_vec();
let arena_len: u32 = arena.len() as u32;
resp.documents = Some(Documents {
binding: *binding as u32,
arena: arena,
docs_json: vec![Slice {
begin: 0,
end: arena_len,
}],
})
}
}
)))?;
let arena = record.data.get().as_bytes().to_vec();
let arena_len: u32 = arena.len() as u32;
resp.documents = Some(Documents {
binding: *binding as u32,
arena: arena,
docs_json: vec![Slice {
begin: 0,
end: arena_len,
}],
});
drop(stream_to_binding);
Ok(Some((encode_message(&resp)?, (true, stb, stream))))
} else {
Expand Down

0 comments on commit 97574d1

Please sign in to comment.