-
Notifications
You must be signed in to change notification settings - Fork 523
Closed
Labels
Milestone
Description
What happened
Fluvio consumer fails after receiving about 2k records when using a map Smartmodule.
Describe the setup
- Fluvio 0.9.27 and 0.9.28
- Also Fluvio Cloud
How to reproduce
Create a Rust Fluvio consumer app:
use anyhow;
use fluvio::{Offset, FluvioError, RecordKey};
use fluvio::consumer::{SmartModuleKind, SmartModuleInvocation, SmartModuleInvocationWasm};
use futures::StreamExt;
use serde::Deserialize;
use tokio::time::{sleep, Duration};
const FLUVIO_TOPIC: &str = "mytopic";
const FLUVIO_PARTITION: i32 = 0;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = fluvio::consumer::ConsumerConfig::builder()
.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined("mapper".to_owned()),
kind: SmartModuleKind::Map,
..Default::default()
}))
.build().unwrap();
let consumer = fluvio::consumer(FLUVIO_TOPIC, FLUVIO_PARTITION).await?;
let mut stream = consumer.stream_with_config(Offset::beginning(), config).await?;
while let Some(Ok(record)) = stream.next().await {
match serde_json::from_slice::<MyRecord>(record.value()) {
Ok(my_record) => println!("{}", my_record.serial),
Err(err) => println!("{}", err)
}
sleep(Duration::from_millis(10)).await;
}
Ok(())
}
#[derive(Deserialize, Debug)]
struct MyRecord {
serial: i64,
data0: String,
data1: String,
data2: String,
data3: String,
data4: String,
data5: String,
data6: String,
data7: String,
}Create a map SmartModule named mapper:
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use serde::{Deserialize, Serialize};
#[smartmodule(map)]
pub fn invoice_transformation_map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
let input = serde_json::from_slice::<Input>(record.value.as_ref())?;
let output = Output::from(input);
let serialized_output = serde_json::to_vec(&output)?;
Ok((None, RecordData::from(serialized_output)))
}
#[derive(Deserialize)]
struct Input {
serial: i64,
data0: String,
data1: String,
data2: String,
data3: String,
data4: String,
data5: String,
data6: String,
data7: String
}
#[derive(Serialize)]
struct Output {
serial: i64,
data0: String,
data1: String,
data2: String,
data3: String,
data4: String,
data5: String,
data6: String,
data7: String
}
impl From<Input> for Output {
fn from(input: Input) -> Self {
let serial = input.serial.to_owned();
let data0 = input.data0.to_owned();
let data1 = input.data1.to_owned();
let data2 = input.data2.to_owned();
let data3 = input.data3.to_owned();
let data4 = input.data4.to_owned();
let data5 = input.data5.to_owned();
let data6 = input.data6.to_owned();
let data7 = input.data7.to_owned();
Self {
serial,
data0,
data1,
data2,
data3,
data4,
data5,
data6,
data7,
}
}
}Populate mytopic with a few thousand of there with large strings:
{
"serial": 0,
"data0": "<random data>",
"data1": "<random data>",
"data2": "<random data>",
"data3": "<random data>",
"data4": "<random data>",
"data5": "<random data>",
"data6": "<random data>",
"data7": "<random data>",
}Run the Fluvio consumer app. The app will show it stalling after a few thousand records.