Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 12 additions & 39 deletions sentry_streams/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions sentry_streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
pyo3 = { version = "0.24.0" }
pyo3 = { version = "0.28.2" }
serde = { version = "1.0", features = ["derive"] }
sentry_arroyo = { version = "2.38.2", features = ["ssl"] }
chrono = "0.4.40"
Expand Down Expand Up @@ -39,9 +39,6 @@ cli = ["dep:clap", "pyo3/auto-initialize"]
parking_lot = "0.12.1"
pyo3 = { version = "*", features = ["auto-initialize"] }

[build-dependencies]
pyo3-build-config = "*"

# For now (while we are still rolling out streams to SBC), let's enable maximal
# debug information. There should only be overhead in space and when many
# stacktraces are generated (thousands/sec)
Expand Down
6 changes: 6 additions & 0 deletions sentry_streams/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ description = "The python Sentry Streaming API"
readme = "README.md"

requires-python = ">=3.11"
classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
]

dependencies = [
"requests>=2.32.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ name = "runner"
path = "src/main.rs"

[dependencies]
pyo3 = { version = "0.24" }
pyo3 = { version = "0.28" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

Expand Down
4 changes: 2 additions & 2 deletions sentry_streams/src/callers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pyo3::{import_exception, prelude::*, types::PyTuple};
use pyo3::{call::PyCallArgs, import_exception, prelude::*};

import_exception!(sentry_streams.pipeline.exception, InvalidMessageError);

Expand All @@ -16,7 +16,7 @@ pub fn try_apply_py<'py, N>(
args: N,
) -> ApplyResult<Py<PyAny>>
where
N: IntoPyObject<'py, Target = PyTuple>,
N: PyCallArgs<'py>,
{
callable.call1(py, args).map_err(|py_err| {
py_err.print(py);
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/committable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn convert_py_committable(
let mut committable = BTreeMap::new();
let dict = py_committable.bind(py);
for (key, value) in dict.iter() {
let partition = key.downcast::<PyTuple>()?;
let partition = key.cast::<PyTuple>()?;
let topic: String = partition.get_item(0)?.extract()?;
let index: u16 = partition.get_item(1)?.extract()?;
let offset: u64 = value.extract()?;
Expand Down
4 changes: 2 additions & 2 deletions sentry_streams/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod tests {

if let PyStreamingMessage::RawMessage { ref content } = py_payload {
let payload = content.getattr(py, "payload").unwrap();
let down: &Bound<PyBytes> = payload.bind(py).downcast().unwrap();
let down: &Bound<PyBytes> = payload.bind(py).cast().unwrap();
let payload_bytes: &[u8] = down.as_bytes();
assert_eq!(payload_bytes, payload_data);
} else {
Expand All @@ -355,7 +355,7 @@ mod tests {
.getattr(py, "payload")
.unwrap()
.bind(py)
.downcast::<PyBytes>()
.cast::<PyBytes>()
.unwrap()
.as_bytes()
.to_vec();
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ macro_rules! rust_function {
let rust_msg = $crate::ffi::convert_py_message_to_rust::<$input_type>(py, &py_msg)?;

// Release GIL and call Rust function
let result_msg = py.allow_threads(|| {
let result_msg = py.detach(|| {
// clone metadata, but try very hard to avoid cloning the payload
let (payload, metadata) = rust_msg.take();
let metadata_clone = metadata.clone();
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/gcs_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn pybytes_to_bytes(message: &PyStreamingMessage, py: Python<'_>) -> PyResult<Ve
}
PyStreamingMessage::RawMessage { ref content } => {
let payload_content = content.bind(py).getattr("payload").unwrap();
let py_bytes: &Bound<PyBytes> = payload_content.downcast().unwrap();
let py_bytes: &Bound<PyBytes> = payload_content.cast().unwrap();
Ok(py_bytes.as_bytes().to_vec())
}
}
Expand Down
8 changes: 4 additions & 4 deletions sentry_streams/src/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sentry_arroyo::backends::kafka::config::KafkaConfig;
use sentry_arroyo::backends::kafka::InitialOffset as KafkaInitialOffset;
use std::collections::HashMap;

#[pyclass]
#[pyclass(from_py_object)]
#[derive(Debug, Clone, Copy, Default)]
pub enum InitialOffset {
#[default]
Expand All @@ -34,7 +34,7 @@ impl From<InitialOffset> for KafkaInitialOffset {
}
}

#[pyclass]
#[pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct OffsetResetConfig {
#[pyo3(get, set)]
Expand All @@ -55,7 +55,7 @@ impl OffsetResetConfig {
}

// Python version of the Kafka consumer configuration
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PyKafkaConsumerConfig {
bootstrap_servers: Vec<String>,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl From<PyKafkaConsumerConfig> for KafkaConfig {
}
}

#[pyclass]
#[pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PyKafkaProducerConfig {
bootstrap_servers: Vec<String>,
Expand Down
14 changes: 7 additions & 7 deletions sentry_streams/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn headers_to_vec(py: Python<'_>, headers: Py<PySequence>) -> PyResult<Vec<(
.try_iter()?
.map(|item| -> PyResult<(String, Vec<u8>)> {
let tuple_i = item?;
let tuple = tuple_i.downcast::<pyo3::types::PyTuple>()?;
let tuple = tuple_i.cast::<pyo3::types::PyTuple>()?;
let key = tuple.get_item(0)?.unbind().extract(py)?;
let value: Vec<u8> = tuple.get_item(1)?.unbind().extract(py)?;
Ok((key, value))
Expand Down Expand Up @@ -333,7 +333,7 @@ pub fn into_pyraw(py: Python<'_>, message: RawMessage) -> PyResult<Py<RawMessage
/// TODO: See the TODO at the module level. This is where we would put the message
/// metadata.
#[derive(Debug)]
#[pyclass]
#[pyclass(from_py_object)]
pub enum PyStreamingMessage {
PyAnyMessage { content: Py<PyAnyMessage> },
RawMessage { content: Py<RawMessage> },
Expand Down Expand Up @@ -454,12 +454,12 @@ impl From<Py<PyAny>> for PyStreamingMessage {
traced_with_gil!(|py| {
let bound = value.clone_ref(py).into_bound(py);
if bound.is_instance_of::<PyAnyMessage>() {
let content = bound.downcast::<PyAnyMessage>()?;
let content = bound.cast::<PyAnyMessage>()?;
Ok(PyStreamingMessage::PyAnyMessage {
content: content.clone().unbind(),
})
} else if bound.is_instance_of::<RawMessage>() {
let content = bound.downcast::<RawMessage>()?;
let content = bound.cast::<RawMessage>()?;
Ok(PyStreamingMessage::RawMessage {
content: content.clone().unbind(),
})
Expand All @@ -481,16 +481,16 @@ impl TryFrom<Py<PyAny>> for WatermarkMessage {
traced_with_gil!(|py| {
let bound = value.clone_ref(py).into_bound(py);
if bound.is_instance_of::<PyWatermark>() {
let py_watermark = bound.downcast::<PyWatermark>()?;
let py_watermark = bound.cast::<PyWatermark>()?;
let py_committable = py_watermark
.getattr("committable")?
.downcast::<PyDict>()?
.cast::<PyDict>()?
.clone()
.unbind();
let committable = convert_py_committable(py, py_committable).unwrap();
let timestamp = match py_watermark
.getattr("timestamp")?
.downcast::<PyInt>()?
.cast::<PyInt>()?
.extract::<u64>()
{
Ok(time) => time,
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/metrics_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use pyo3::prelude::*;
use std::collections::HashMap;

#[pyclass]
#[pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PyMetricConfig {
host: String,
Expand Down
4 changes: 2 additions & 2 deletions sentry_streams/src/python_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ impl PythonAdapter {
/// the Python delegate.
fn handle_py_return_value(&mut self, py: Python<'_>, payloads: Vec<Py<PyAny>>) {
for py_payload in payloads {
let entry = py_payload.downcast_bound::<PyTuple>(py).unwrap();
let entry = py_payload.cast_bound::<PyTuple>(py).unwrap();
let payload: Py<PyAny> = entry.get_item(0).unwrap().unbind();
let committable: Py<PyAny> = entry.get_item(1).unwrap().unbind();
let committable_dict = committable
.downcast_bound::<PyDict>(py)
.cast_bound::<PyDict>(py)
.unwrap()
.as_unbound()
.clone_ref(py);
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
///
/// The waypoints sequence contains the branches taken by the message
/// in order following the pipeline.
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct Route {
/// The name of the streaming source this route starts from.
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn to_kafka_payload(message: Message<RoutedValue>) -> Message<KafkaPayload> {
RoutedValuePayload::PyStreamingMessage(PyStreamingMessage::RawMessage { ref content }) => {
traced_with_gil!(|py| {
let payload_content = content.bind(py).getattr("payload").unwrap();
let py_bytes: &Bound<PyBytes> = payload_content.downcast().unwrap();
let py_bytes: &Bound<PyBytes> = payload_content.cast().unwrap();
let raw_bytes = py_bytes.as_bytes();
let mut headers = Headers::new();
headers = headers.insert("is_watermark", Some(vec![0]));
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub fn initialize_python() {
let python_path = std::env::var("STREAMS_TEST_PYTHONPATH").unwrap();
let python_path: Vec<_> = python_path.split(':').map(String::from).collect();

Python::with_gil(|py| -> PyResult<()> {
Python::attach(|py| -> PyResult<()> {
PyModule::import(py, "sys")?.setattr("executable", python_executable)?;
PyModule::import(py, "sys")?.setattr("path", python_path)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ where
let thread_id = thread::current().id();
let start_time = Instant::now();

Python::with_gil(|py| {
Python::attach(|py| {
let acquire_time = Instant::now().duration_since(start_time);

if acquire_time > warn_threshold {
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/tests/rust_test_functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "rust_test_functions"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.24" }
pyo3 = { version = "0.28" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
paste = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading