Skip to content

Commit

Permalink
consumer: init (#1)
Browse files Browse the repository at this point in the history
* consumer: init

* client: subscription

* consumer: init test
  • Loading branch information
alissa-tung committed Aug 28, 2022
1 parent fa849df commit 26741d9
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/hstreamdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ prost-types = "0.11.1"

prost = "0.11.0"
tokio = { version = "1.20.1", features = ["rt-multi-thread"] }
tokio-stream = "0.1.9"
tonic = "0.8.0"
url = "2.2.2"

Expand Down
123 changes: 120 additions & 3 deletions src/hstreamdb/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use common::Stream;
use hstreamdb_pb::h_stream_api_client::HStreamApiClient;
use hstreamdb_pb::{CompressionType, DeleteStreamRequest, ListStreamsRequest, NodeState};
use hstreamdb_pb::{
CompressionType, DeleteStreamRequest, DeleteSubscriptionRequest, ListStreamsRequest,
ListSubscriptionsRequest, NodeState, Subscription,
};
use tonic::transport::Channel;
use tonic::Request;
use url::Url;
Expand All @@ -10,7 +13,7 @@ use crate::producer::Producer;
use crate::{common, format_url, producer};

pub struct Client {
hstream_api_client: HStreamApiClient<Channel>,
pub(crate) hstream_api_client: HStreamApiClient<Channel>,
url_scheme: String,
_available_node_addrs: Vec<String>,
}
Expand Down Expand Up @@ -93,6 +96,39 @@ impl Client {
}
}

impl Client {
pub async fn create_subscription(&mut self, subscription: Subscription) -> common::Result<()> {
self.hstream_api_client
.create_subscription(subscription)
.await?;
Ok(())
}

pub async fn delete_subscription(
&mut self,
subscription_id: String,
force: bool,
) -> common::Result<()> {
self.hstream_api_client
.delete_subscription(DeleteSubscriptionRequest {
subscription_id,
force,
})
.await?;
Ok(())
}

pub async fn list_subscriptions(&mut self) -> common::Result<Vec<Subscription>> {
let subscriptions = self
.hstream_api_client
.list_subscriptions(ListSubscriptionsRequest {})
.await?
.into_inner()
.subscription;
Ok(subscriptions)
}
}

impl Client {
pub async fn new_producer(
&mut self,
Expand Down Expand Up @@ -120,7 +156,7 @@ impl Client {
mod tests {
use std::env;

use hstreamdb_pb::Stream;
use hstreamdb_pb::{SpecialOffset, Stream, Subscription};
use hstreamdb_test_utils::rand_alphanumeric;

use super::Client;
Expand Down Expand Up @@ -157,4 +193,85 @@ mod tests {
assert!(!listed_streams.contains(&stream));
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_subscription_cld() {
let addr = env::var("TEST_SERVER_ADDR").unwrap();
let mut client = Client::new(addr).await.unwrap();

let make_stream = |stream_name| Stream {
stream_name,
replication_factor: 1,
backlog_duration: 30 * 60,
shard_count: 1,
};
let streams = (0..10)
.map(|_| make_stream(format!("stream-{}", rand_alphanumeric(10))))
.collect::<Vec<_>>();
for stream in streams.iter() {
client.create_stream(stream.clone()).await.unwrap()
}

let listed_streams = client.list_streams().await.unwrap();
for stream in streams.iter() {
assert!(listed_streams.contains(stream));
}

let make_subscription = |subscription_id, stream_name| Subscription {
subscription_id,
stream_name,
ack_timeout_seconds: 60 * 10,
max_unacked_records: 1000,
offset: SpecialOffset::Earliest as i32,
};
for stream in streams.iter() {
let subscription_ids = (0..5)
.map(|_| format!("subscription-{}", rand_alphanumeric(10)))
.collect::<Vec<_>>();
for subscription_id in subscription_ids.iter() {
client
.create_subscription(make_subscription(
subscription_id.clone(),
stream.stream_name.clone(),
))
.await
.unwrap()
}
let subscriptions = client
.list_subscriptions()
.await
.unwrap()
.into_iter()
.map(|x| x.subscription_id)
.collect::<Vec<_>>();
for subscription_id in subscription_ids.iter() {
assert!(subscriptions.contains(subscription_id));
client
.delete_subscription(subscription_id.clone(), true)
.await
.unwrap();
}
let subscriptions = client
.list_subscriptions()
.await
.unwrap()
.into_iter()
.map(|x| x.subscription_id)
.collect::<Vec<_>>();
for subscription_id in subscription_ids.iter() {
assert!(!subscriptions.contains(subscription_id));
}
}

for stream in streams.iter() {
client
.delete_stream(stream.stream_name.clone(), false, true)
.await
.unwrap();
}
let listed_streams = client.list_streams().await.unwrap();
for stream in streams {
assert!(!listed_streams.contains(&stream));
}
}
}
4 changes: 3 additions & 1 deletion src/hstreamdb/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;

pub use hstreamdb_pb::Stream;
use hstreamdb_pb::StreamingFetchRequest;
pub use hstreamdb_pb::{Stream, Subscription};
use num_bigint::ParseBigIntError;
use tonic::transport;

Expand All @@ -11,6 +12,7 @@ pub enum Error {
CompressError(io::Error),
ParseUrlError(url::ParseError),
PartitionKeyError(PartitionKeyError),
StreamingFetchInitError(tokio::sync::mpsc::error::SendError<StreamingFetchRequest>),
}

#[derive(Debug)]
Expand Down
182 changes: 182 additions & 0 deletions src/hstreamdb/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::io::Write;

use flate2::write::GzDecoder;
use hstreamdb_pb::{
BatchHStreamRecords, BatchedRecord, HStreamRecord, StreamingFetchRequest,
StreamingFetchResponse,
};
use prost::Message;
use prost_types::Struct;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Streaming};

use crate::client::Client;
use crate::common::{self, Payload};

impl Client {
pub async fn streaming_fetch(
&mut self,
consumer_name: String,
subscription_id: String,
) -> common::Result<UnboundedReceiverStream<(Payload, AckFn)>> {
let request = StreamingFetchRequest {
subscription_id: subscription_id.clone(),
consumer_name: consumer_name.clone(),
ack_ids: Vec::new(),
};
let (request_sender, request_receiver) =
tokio::sync::mpsc::unbounded_channel::<StreamingFetchRequest>();
let request_stream = UnboundedReceiverStream::new(request_receiver);
let response = self
.hstream_api_client
.streaming_fetch(Request::new(request_stream))
.await?
.into_inner();
request_sender
.send(request)
.map_err(common::Error::StreamingFetchInitError)?;

let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(Payload, AckFn)>();

let _ = tokio::spawn(fetching(
consumer_name,
subscription_id,
request_sender,
response,
sender,
));

Ok(UnboundedReceiverStream::new(receiver))
}
}

async fn fetching(
consumer_name: String,
subscription_id: String,
ack_sender: UnboundedSender<StreamingFetchRequest>,
mut init_response: Streaming<StreamingFetchResponse>,
fetch_stream: UnboundedSender<(Payload, AckFn)>,
) {
loop {
match init_response.message().await {
Err(err) => {
log::error!("streaming fetch error: {err}");
break;
}
Ok(message) => match message {
None => {
return;
}
Some(message) => process_streaming_fetch_response(
consumer_name.clone(),
subscription_id.clone(),
message,
ack_sender.clone(),
fetch_stream.clone(),
),
},
}
}
}

type AckFn = Box<dyn FnOnce() -> Result<(), SendError<StreamingFetchRequest>> + Send>;

fn process_streaming_fetch_response(
consumer_name: String,
subscription_id: String,
message: StreamingFetchResponse,
ack_sender: UnboundedSender<StreamingFetchRequest>,
fetch_stream: UnboundedSender<(Payload, AckFn)>,
) {
match message.received_records {
None => {
log::warn!("streaming fetch error: failed to unwrap `received_records`");
}
Some(received_records) => match decode_received_records(
received_records.record,
received_records.record_ids.is_empty(),
) {
Err(err) => {
log::error!("decode received records error: {err}")
}
Ok(records) => {
for (record, record_id) in records.into_iter().zip(received_records.record_ids) {
let record = match record.header {
None => {
log::error!(
"process streaming fetch response error: failed to unwrap record header"
);
return;
}
Some(header) => match header.flag() {
hstreamdb_pb::h_stream_record_header::Flag::Raw => {
Payload::RawRecord(record.payload)
}
hstreamdb_pb::h_stream_record_header::Flag::Json => {
match Struct::decode(record.payload.as_slice()) {
Err(err) => {
log::error!("decode HRecord error: {err}");
return;
}
Ok(payload) => Payload::HRecord(payload),
}
}
},
};
let ack_sender = ack_sender.clone();
let subscription_id = subscription_id.clone();
let consumer_name = consumer_name.clone();
let ack_fn: AckFn = box (move || {
ack_sender.send(StreamingFetchRequest {
subscription_id,
consumer_name,
ack_ids: vec![record_id],
})
});
match fetch_stream.send((record, ack_fn)) {
Ok(()) => (),
Err(err) => {
log::error!("send to fetch stream error: {err}")
}
}
}
}
},
}
}

fn decode_received_records(
received_records: Option<BatchedRecord>,
is_empty: bool,
) -> Result<Vec<HStreamRecord>, String> {
if is_empty {
Ok(Vec::new())
} else {
match received_records {
None => {
Err("decode received records error: failed to unwrap batched record".to_string())
}
Some(record) => {
let compression_type = record.compression_type();
let payload = {
let payload = record.payload;
match compression_type {
hstreamdb_pb::CompressionType::None => Ok(payload),
hstreamdb_pb::CompressionType::Gzip => {
let mut decoder = GzDecoder::new(Vec::new());
decoder.write_all(&payload).map_err(|err| err.to_string())?;
decoder.finish().map_err(|err| err.to_string())
}
}
}?;

let records = BatchHStreamRecords::decode(payload.as_slice())
.map_err(|err| err.to_string())?
.records;
Ok(records)
}
}
}
}
4 changes: 4 additions & 0 deletions src/hstreamdb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#![feature(try_blocks)]
#![feature(box_syntax)]

pub mod appender;
pub mod client;
pub mod common;
pub mod consumer;
pub mod producer;
pub mod utils;

pub use common::{Error, Record, Result, Stream, Subscription};
2 changes: 1 addition & 1 deletion src/hstreamdb/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn flush(
)
.await
.map_err(|err| format!("producer append error: addr = {server_node}, {err:?}"))
.map(|x| log::debug!("append successed: len = {}", x.len()))?;
.map(|x| log::debug!("append succeed: len = {}", x.len()))?;
Ok(())
}
}
Expand Down
Loading

0 comments on commit 26741d9

Please sign in to comment.