Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jan 18, 2024
1 parent c0192f4 commit 5f244ea
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 149 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use http::StatusCode;

use petgraph::Direction;
use std::collections::HashMap;
use std::env;

use std::time::Duration;

Expand Down Expand Up @@ -331,15 +332,15 @@ pub(crate) async fn create_pipeline<'a>(

set_parallelism(&mut compiled.program, 1);

// if is_preview {
// for node in compiled.program.graph.node_weights_mut() {
// // replace all sink connectors with websink for preview
// if node.operator_name == OperatorName::ConnectorSink {
// node.operator_config =
// api::ConnectorOp::from(ConnectorOp::web_sink()).encode_to_vec();
// }
// }
// }
if is_preview && !env::var("PREVIEW_SINKS").is_ok_and(|s| s == "true") {
for node in compiled.program.graph.node_weights_mut() {
// replace all sink connectors with websink for preview
if node.operator_name == OperatorName::ConnectorSink {
node.operator_config =
api::ConnectorOp::from(ConnectorOp::web_sink()).encode_to_vec();
}
}
}

register_schemas(&mut compiled)
.await
Expand Down
2 changes: 1 addition & 1 deletion arroyo-formats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow_schema::Schema;
use arroyo_rpc::formats::{AvroFormat, BadData, Format, Framing, FramingMethod, JsonFormat};
use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, SchemaResolver};
use arroyo_rpc::ArroyoSchema;
use arroyo_types::{Data, Debezium, RawJson, should_flush, SourceError, to_nanos};
use arroyo_types::{should_flush, to_nanos, Data, Debezium, RawJson, SourceError};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::json;
Expand Down
4 changes: 2 additions & 2 deletions arroyo-formats/src/old.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::{avro, FramingIterator, json, SchemaData};
use crate::{avro, json, FramingIterator, SchemaData};
use arroyo_rpc::formats::{AvroFormat, Format, Framing};
use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, SchemaResolver};
use arroyo_types::SourceError;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::Mutex;
use serde_json::Value;

fn deserialize_raw_string<T: DeserializeOwned>(msg: &[u8]) -> Result<T, String> {
let json = json! {
Expand Down
26 changes: 17 additions & 9 deletions arroyo-formats/src/serialize/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{avro, json};
use arrow_array::RecordBatch;
use arrow_json::writer::record_batches_to_json_rows;
use serde_json::Value;
use arroyo_rpc::{TIMESTAMP_FIELD};
use arroyo_rpc::formats::{Format, JsonFormat};
use crate::{avro, json};
use arroyo_rpc::TIMESTAMP_FIELD;
use serde_json::Value;

pub struct ArrowSerializer {
kafka_schema: Option<Value>,
Expand All @@ -27,23 +27,31 @@ impl ArrowSerializer {

pub fn serialize(&mut self, batch: &RecordBatch) -> Box<dyn Iterator<Item = Vec<u8>> + Send> {
if self.kafka_schema.is_none() {
self.kafka_schema = Some(json::arrow_to_kafka_json("ArroyoJson", batch.schema().fields()));
self.kafka_schema = Some(json::arrow_to_kafka_json(
"ArroyoJson",
batch.schema().fields(),
));
}
if self.avro_schema.is_none() {
self.avro_schema = Some(avro::arrow_to_avro_schema("ArroyoAvro", batch.schema().fields()));
self.avro_schema = Some(avro::arrow_to_avro_schema(
"ArroyoAvro",
batch.schema().fields(),
));
}

match &self.format {
Format::Json(json) => {
self.serialize_json(json, batch)
}
Format::Json(json) => self.serialize_json(json, batch),
Format::Avro(_) => todo!("avro"),
Format::Parquet(_) => todo!("parquet"),
Format::RawString(_) => todo!("raw string"),
}
}

fn serialize_json(&self, json: &JsonFormat, batch: &RecordBatch) -> Box<dyn Iterator<Item = Vec<u8>> + Send> {
fn serialize_json(
&self,
json: &JsonFormat,
batch: &RecordBatch,
) -> Box<dyn Iterator<Item = Vec<u8>> + Send> {
let header = json.confluent_schema_registry.then(|| {
if json.include_schema {
unreachable!("can't include schema when writing to confluent schema registry, should've been caught when creating JsonFormat");
Expand Down
1 change: 1 addition & 0 deletions arroyo-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ datafusion-physical-plan = "34.0"
datafusion-common = "34.0"
datafusion-execution = "34.0"
base64 = "0.21.5"
itertools = "0.12.0"

[dev-dependencies]
test-case = "3"
36 changes: 20 additions & 16 deletions arroyo-worker/src/connectors/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
use anyhow::{anyhow, Result};
use arrow_array::RecordBatch;
use arroyo_rpc::formats::Format;
use arroyo_rpc::{CheckpointEvent, ControlMessage, OperatorConfig};
use arroyo_types::*;
use async_trait::async_trait;
use rdkafka::error::KafkaError;
use rdkafka::ClientConfig;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use arrow_array::RecordBatch;
use async_trait::async_trait;

use super::{client_configs, KafkaConfig, KafkaTable, SinkCommitMode, TableType};
use crate::engine::ArrowContext;
use crate::operator::{ArrowOperator, ArrowOperatorConstructor, OperatorNode};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_rpc::grpc::api::ConnectorOp;
use arroyo_rpc::grpc::{api, TableConfig};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka_sys::RDKafkaErrorCode;
use tracing::{error, warn};
use arroyo_formats::serialize::ArrowSerializer;
use arroyo_rpc::grpc::api::ConnectorOp;
use crate::engine::ArrowContext;
use crate::operator::{ArrowOperator, ArrowOperatorConstructor, OperatorNode};

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -53,10 +53,10 @@ impl From<SinkCommitMode> for ConsistencyMode {
}
}

impl ArrowOperatorConstructor<api::ConnectorOp> for KafkaSinkFunc{
impl ArrowOperatorConstructor<api::ConnectorOp> for KafkaSinkFunc {
fn from_config(config: ConnectorOp) -> Result<OperatorNode> {
let config: OperatorConfig =
serde_json::from_str(&config.operator).expect("Invalid config for KafkaSink");
serde_json::from_str(&config.config).expect("Invalid config for KafkaSink");
let connection: KafkaConfig = serde_json::from_value(config.connection)
.expect("Invalid connection config for KafkaSink");
let table: KafkaTable =
Expand Down Expand Up @@ -207,7 +207,6 @@ impl ArrowOperator for KafkaSinkFunc {
}
}


async fn handle_checkpoint(&mut self, _: CheckpointBarrier, ctx: &mut ArrowContext) {
self.flush().await;
if let ConsistencyMode::ExactlyOnce {
Expand All @@ -229,15 +228,20 @@ impl ArrowOperator for KafkaSinkFunc {
}

async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) {
let k = batch.project(&ctx.in_schemas[0].key_indices).unwrap();
let values = self.serializer.serialize(&batch);

let items = self.serializer.serialize(&k)
.zip(self.serializer.serialize(&batch));
let keys = if !ctx.in_schemas[0].key_indices.is_empty() {
let k = batch.project(&ctx.in_schemas[0].key_indices).unwrap();

// TODO: we can probably batch this for better performance
for (k, v) in items {
self.publish(Some(k), v).await;
}
// TODO: we can probably batch this for better performance
for (k, v) in self.serializer.serialize(&k).zip(values) {
self.publish(Some(k), v).await;
}
} else {
for v in values {
self.publish(None, v).await;
}
};
}

async fn handle_commit(
Expand Down

0 comments on commit 5f244ea

Please sign in to comment.