Skip to content

Commit

Permalink
Fix #252 kafka sink user error (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
breezykermo committed Jan 8, 2024
1 parent 82f8716 commit 261a0bf
Showing 1 changed file with 60 additions and 27 deletions.
87 changes: 60 additions & 27 deletions arroyo-worker/src/connectors/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arroyo_formats::SchemaData;
use arroyo_macro::process_fn;
use arroyo_rpc::formats::Format;
use arroyo_rpc::grpc::{TableDeleteBehavior, TableDescriptor, TableWriteBehavior};
use arroyo_rpc::{CheckpointEvent, ControlMessage, OperatorConfig};
use arroyo_rpc::{CheckpointEvent, ControlMessage, ControlResp, OperatorConfig};
use arroyo_types::*;
use std::collections::HashMap;
use std::marker::PhantomData;
Expand Down Expand Up @@ -174,24 +174,40 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {
}

async fn handle_checkpoint(&mut self, _: &CheckpointBarrier, ctx: &mut Context<(), ()>) {
self.flush().await;
if let ConsistencyMode::ExactlyOnce {
next_transaction_index,
producer_to_complete,
} = &mut self.consistency_mode
{
*producer_to_complete = self.producer.take();
ctx.state
.get_global_keyed_state('i')
.await
.insert(ctx.task_info.task_index, *next_transaction_index)
.await;
self.init_producer(&ctx.task_info)
.expect("creating new producer during checkpointing");
match self.flush().await {
Ok(_) => {
if let ConsistencyMode::ExactlyOnce {
next_transaction_index,
producer_to_complete,
} = &mut self.consistency_mode
{
*producer_to_complete = self.producer.take();
ctx.state
.get_global_keyed_state('i')
.await
.insert(ctx.task_info.task_index, *next_transaction_index)
.await;
self.init_producer(&ctx.task_info)
.expect("creating new producer during checkpointing");
}
}
Err(e) => {
ctx.control_tx
.send(ControlResp::Error {
operator_id: ctx.task_info.operator_id.clone(),
task_index: ctx.task_info.task_index,
message: e.name.clone(),
details: e.details.clone(),
})
.await
.unwrap();

panic!("{}: {}", e.name, e.details);
}
}
}

async fn flush(&mut self) {
async fn flush(&mut self) -> Result<(), UserError> {
self.producer
.as_ref()
.unwrap()
Expand All @@ -202,16 +218,15 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {

// ensure all messages were delivered before finishing the checkpoint
for future in self.write_futures.drain(..) {
match future.await.expect("Kafka producer shut down") {
Ok(_) => {}
Err((e, _)) => {
panic!("Unhandled kafka error: {:?}", e);
}
}
future
.await
.unwrap()
.map_err(|e| UserError::new("Kafka producer shut down", format!("{:?}", e)))?;
}
Ok(())
}

async fn publish(&mut self, k: Option<String>, v: Vec<u8>) {
async fn publish(&mut self, k: Option<String>, v: Vec<u8>) -> Result<(), UserError> {
let mut rec = {
if let Some(k) = k.as_ref() {
FutureRecord::to(&self.topic).key(k).payload(&v)
Expand All @@ -224,13 +239,16 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {
match self.producer.as_mut().unwrap().send_result(rec) {
Ok(future) => {
self.write_futures.push(future);
return;
return Ok(());
}
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), f)) => {
rec = f;
}
Err((e, _)) => {
panic!("Unhandled kafka error: {:?}", e);
return Err(UserError::new(
"Could not write to Kafka",
format!("{:?}", e),
));
}
}

Expand All @@ -239,15 +257,30 @@ impl<K: Key + Serialize, T: SchemaData + Serialize> KafkaSinkFunc<K, T> {
}
}

async fn process_element(&mut self, record: &Record<K, T>, _ctx: &mut Context<(), ()>) {
async fn process_element(&mut self, record: &Record<K, T>, ctx: &mut Context<(), ()>) {
let k = record
.key
.as_ref()
.map(|k| serde_json::to_string(k).unwrap());
let v = self.serializer.to_vec(&record.value);

if let Some(v) = v {
self.publish(k, v).await;
match self.publish(k, v).await {
Ok(_) => {}
Err(e) => {
ctx.control_tx
.send(ControlResp::Error {
operator_id: ctx.task_info.operator_id.clone(),
task_index: ctx.task_info.task_index,
message: e.name.clone(),
details: e.details.clone(),
})
.await
.unwrap();

panic!("{}: {}", e.name, e.details);
}
};
}
}

Expand Down

0 comments on commit 261a0bf

Please sign in to comment.