Skip to content

Commit

Permalink
erl: maybe fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
alissa-tung committed Oct 9, 2022
1 parent 8690b47 commit 8ba3d03
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions src/x/hstreamdb-erl-nifs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,38 @@ fn stop_producer(producer: ResourceArc<NifAppender>) -> Atom {
ok()
}

#[rustler::nif]
fn append(
fn try_append(
producer: ResourceArc<NifAppender>,
partition_key: String,
raw_payload: String,
) -> ResourceArc<AppendResultFuture> {
raw_payload: Term,
) -> hstreamdb::Result<ResourceArc<AppendResultFuture>> {
let raw_payload = rustler::Binary::from_term(raw_payload)
.map_err(|err| hstreamdb::common::Error::BadArgument(format!("{err:?}")))?;
let record = Record {
partition_key,
payload: hstreamdb::Payload::RawRecord(raw_payload.into_bytes()),
payload: hstreamdb::Payload::RawRecord(raw_payload.to_vec()),
};
let producer = &producer.0;
let (sender, receiver) = oneshot::channel();
producer.blocking_send(Some((record, sender))).unwrap();
let receiver = receiver.blocking_recv().unwrap();
ResourceArc::new(AppendResultFuture(
Ok(ResourceArc::new(AppendResultFuture(
Mutex::new(Some(receiver)),
OnceCell::new(),
))
)))
}

#[rustler::nif]
fn append<'a>(
env: Env<'a>,
producer: ResourceArc<NifAppender>,
partition_key: String,
raw_payload: Term,
) -> Term<'a> {
match try_append(producer, partition_key, raw_payload) {
Ok(x) => (ok(), x).encode(env),
Err(err) => (error(), err.to_string()).encode(env),
}
}

#[rustler::nif]
Expand Down

0 comments on commit 8ba3d03

Please sign in to comment.