Skip to content

Commit

Permalink
ref: Upgrade arroyo, use coarsetime (#5934)
Browse files Browse the repository at this point in the history
* ref: Upgrade arroyo, use coarsetime

Changes necessary for getsentry/arroyo#366, and
additional timers for get_str_config, since that one acquires multiple
global locks and might impact concurrency.

* bump arroyo

* update arroyo again

* remove another close

* fix tests again
  • Loading branch information
untitaker committed May 17, 2024
1 parent fa715c2 commit 35c051d
Show file tree
Hide file tree
Showing 13 changed files with 13 additions and 39 deletions.
2 changes: 1 addition & 1 deletion rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rust_snuba/bin/python_processor_infinite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ fn main() {

println!("join");

step.close();
step.join(None).unwrap();

println!("{}", output.load(Ordering::Relaxed))
Expand Down
7 changes: 4 additions & 3 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {

fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
// Commit offsets
let next_step = CommitOffsets::new(chrono::Duration::seconds(1));
let next_step = CommitOffsets::new(Duration::from_secs(1));

// Produce commit log if there is one
let next_step: Box<dyn ProcessingStrategy<BytesInsertBatch<()>>> =
Expand Down Expand Up @@ -119,8 +119,9 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
);

let accumulator = Arc::new(
|batch: BytesInsertBatch<HttpBatch>, small_batch: BytesInsertBatch<RowData>| {
batch.merge(small_batch)
|batch: BytesInsertBatch<HttpBatch>,
small_batch: Message<BytesInsertBatch<RowData>>| {
Ok(batch.merge(small_batch.into_payload()))
},
);

Expand Down
11 changes: 7 additions & 4 deletions rust_snuba/src/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use pyo3::prelude::{PyModule, Python};
use std::collections::BTreeMap;
use std::time::Duration;

use rust_arroyo::timer;
use rust_arroyo::utils::timing::Deadline;

#[allow(dead_code)]
static CONFIG: RwLock<BTreeMap<String, (Option<String>, Deadline)>> = RwLock::new(BTreeMap::new());

/// Runtime config is cached for 10 seconds
#[allow(dead_code)]
pub fn get_str_config(key: &str) -> Result<Option<String>, Error> {
let deadline = Deadline::new(Duration::from_secs(10));

Expand All @@ -21,7 +20,7 @@ pub fn get_str_config(key: &str) -> Result<Option<String>, Error> {
}
}

Python::with_gil(|py| {
let rv = Python::with_gil(|py| {
let snuba_state = PyModule::import(py, "snuba.state")?;
let config = snuba_state
.getattr("get_str_config")?
Expand All @@ -32,7 +31,11 @@ pub fn get_str_config(key: &str) -> Result<Option<String>, Error> {
.write()
.insert(key.to_string(), (config.clone(), deadline));
Ok(CONFIG.read().get(key).unwrap().0.clone())
})
});

timer!("runtime_config.get_str_config", deadline.elapsed());

rv
}

#[cfg(test)]
Expand Down
4 changes: 0 additions & 4 deletions rust_snuba/src/strategies/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@ where
self.next_step.submit(message)
}

fn close(&mut self) {
self.next_step.close()
}

fn terminate(&mut self) {
self.next_step.terminate()
}
Expand Down
4 changes: 0 additions & 4 deletions rust_snuba/src/strategies/clickhouse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ impl ProcessingStrategy<BytesInsertBatch<HttpBatch>> for ClickhouseWriterStep {
self.inner.submit(message)
}

fn close(&mut self) {
self.inner.close();
}

fn terminate(&mut self) {
self.inner.terminate();
}
Expand Down
5 changes: 0 additions & 5 deletions rust_snuba/src/strategies/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,6 @@ impl ProcessingStrategy<BytesInsertBatch<()>> for ProduceCommitLog {
self.inner.submit(message)
}

fn close(&mut self) {
self.inner.close();
}

fn terminate(&mut self) {
self.inner.terminate();
}
Expand Down Expand Up @@ -353,7 +349,6 @@ mod tests {
strategy.poll().unwrap();
}

strategy.close();
strategy.join(None).unwrap();

let produced = produced_payloads.lock().unwrap();
Expand Down
4 changes: 0 additions & 4 deletions rust_snuba/src/strategies/join_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ where
self.next_step.terminate()
}

fn close(&mut self) {
self.next_step.close();
}

fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
self.next_step.join(self.new_timeout)
}
Expand Down
2 changes: 0 additions & 2 deletions rust_snuba/src/strategies/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ impl<T> ProcessingStrategy<T> for Noop {
Ok(())
}

fn close(&mut self) {}

fn terminate(&mut self) {}

fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Expand Down
1 change: 0 additions & 1 deletion rust_snuba/src/strategies/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ mod tests {
let message = Message::new_broker_message(payload, partition, 0, Utc::now());

strategy.submit(message).unwrap(); // Does not error
strategy.close();
let _ = strategy.join(None);
}
}
4 changes: 1 addition & 3 deletions rust_snuba/src/strategies/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ impl ProcessingStrategy<KafkaPayload> for PythonTransformStep {
Ok(())
}

fn close(&mut self) {}

fn terminate(&mut self) {
self.next_step.terminate()
}
Expand Down Expand Up @@ -260,8 +258,8 @@ impl ProcessingStrategy<KafkaPayload> for PythonTransformStep {
}
}

self.next_step.close();
let next_commit = self.next_step.join(timeout)?;

Ok(merge_commit_request(
self.commit_request_carried_over.take(),
next_commit,
Expand Down
6 changes: 0 additions & 6 deletions rust_snuba/src/strategies/replacements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ impl ProcessingStrategy<InsertOrReplacement<BytesInsertBatch<RowData>>> for Prod
}
}

fn close(&mut self) {
self.inner.close();
self.next_step.close();
}

fn terminate(&mut self) {
self.inner.terminate();
self.next_step.terminate();
Expand Down Expand Up @@ -178,7 +173,6 @@ mod tests {
.unwrap();

strategy.poll().unwrap();
strategy.close();
strategy.join(None).unwrap();

assert_eq!(produced_payloads.lock().unwrap().len(), 1);
Expand Down
1 change: 0 additions & 1 deletion rust_snuba/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ impl<R: Send + Sync> ProcessingStrategy<BytesInsertBatch<R>> for TestStrategy<R>
self.payloads.push(message.into_payload());
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
Expand Down

0 comments on commit 35c051d

Please sign in to comment.