Skip to content

Commit

Permalink
Merge pull request #423 from bytewax/prepare_0.19.0
Browse files Browse the repository at this point in the history
Prepare for v0.19.0 release
  • Loading branch information
whoahbot committed Mar 20, 2024
2 parents a138c28 + 773f3e0 commit 73d8e2f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 4 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -9,6 +9,15 @@ For help with updating to new Bytewax versions, please see the
__Add any extra change notes here and we'll put them in the release
notes on GitHub when we make a new release.__

## v0.19.0

- Multiple operators have been reworked to avoid taking and releasing
Python's global interpreter lock while iterating over multiple items.
Windowing operators, stateful operators and operators like `branch`
will see significant performance improvements.

Thanks to @damiondoesthings for helping us track this down!

- *Breaking change* `FixedPartitionedSource.build_part`,
`DynamicSource.build`, `FixedPartitionedSink.build_part` and `DynamicSink.build`
now take an additional `step_id` argument. This argument can be used when
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "bytewax"
version = "0.18.1"
version = "0.19.0"
edition = "2021"

[lib]
Expand Down
180 changes: 180 additions & 0 deletions docs/articles/reference/migration.md
Expand Up @@ -6,6 +6,186 @@ Bytewax version to the next. For a detailed list of all changes, see
the
[CHANGELOG](https://github.com/bytewax/bytewax/blob/main/CHANGELOG.md).

## From v0.18 to v0.19

### Removal of the `builder` argument from `stateful_map`

The `builder` argument has been removed from {py:obj}`~bytewax.operators.stateful_map`.
The initial state value is always `None` and you can call
your previous builder by hand in the `mapper` function.

Before:

```python doctest:SKIP
def running_builder():
return []


def calc_running_mean(values, new_value):
values.append(new_value)
while len(values) > 3:
values.pop(0)

running_mean = sum(values) / len(values)
return (values, running_mean)


running_means = op.stateful_map(
"running_mean", keyed_amounts, running_builder, calc_running_mean
)
```

After:

```python doctest:SKIP
def calc_running_mean(values, new_value):
# On the initial value for this key, instead of the operator calling the builder for you, you can call it yourself when the state is un-initalized.
if values is None:
values = []

values.append(new_value)
while len(values) > 3:
values.pop(0)

running_mean = sum(values) / len(values)
return (values, running_mean)


running_means = op.stateful_map("running_mean", keyed_amounts, calc_running_mean)
```

### Connector API Now Contains Step ID

{py:obj}`bytewax.inputs.FixedPartitionedSource.build_part`, {py:obj}`bytewax.inputs.DynamicSource.build`, {py:obj}`bytewax.outputs.FixedPartitionedSink.build_part`
and {py:obj}`bytewax.outputs.DynamicSink.build` now take an additional `step_id` argument.
This argument can be used as a label when creating custom Python metrics.

Before:

```python doctest:SKIP
from bytewax.inputs import DynamicSource


class PeriodicSource(DynamicSource):
def build(self, now: datetime, worker_index: int, worker_count: int):
pass
```

After:

```python doctest:SKIP
from bytewax.inputs import DynamicSource


class PeriodicSource(DynamicSource):
def build(self, step_id: str, worker_index: int, worker_count: int):
pass
```

### `datetime` Arguments Removed for Performance

{py:obj}`bytewax.inputs.FixedPartitionedSource.build_part`, {py:obj}`bytewax.inputs.DynamicSource.build` and {py:obj}`bytewax.operators.UnaryLogic.on_item`
no longer take a `now: datetime` argument. {py:obj}`bytewax.inputs.StatefulSourcePartition.next_batch`, {py:obj}`bytewax.inputs.StatelessSourcePartition.next_batch`, and {py:obj}`bytewax.operators.UnaryLogic.on_notify` no longer take a `sched: datetime` argument. Generating these values resulted in significant overhead, even for the majority of sources and stateful operators that never used them.

If you need the current time, you still can manually get the current time:

```python
from datetime import datetime, timezone

now = datetime.now(timezone.utc)
```

If you need the previously scheduled awake time, store it in an instance variable before returning it from {py:obj}`~bytewax.operators.UnaryLogic.notify_at`. Your design probably already has that stored in an instance variable.

### Standardization on Confluent's Kafka Serialization Interface

Bytewax's bespoke Kafka schema registry and serialization interface has been removed in favor of using {py:obj}`confluent_kafka.schema_registry.SchemaRegistryClient` and {py:obj}`confluent_kafka.serialization.Deserializer`s and {py:obj}`confluent_kafka.serialization.Serializer`s directly. This now gives you direct control over all of the configuration options for serialization and supports the full range of use cases. Bytewax's Kafka serialization operators in {py:obj}`bytewax.connectors.kafka.operators` now take these Confluent types.

#### With Confluent Schema Registry

If you are using Confluent's schema registry (with it's magic byte prefix), you can pass serializers like {py:obj}`confluent_kafka.schema_registry.avro.AvroDeserializer` directly to our operators. See Confluent's documentation for all the options here.

Before:

```python doctest:SKIP
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.kafka.registry import ConfluentSchemaRegistry

sr_conf = {"url": CONFLUENT_URL, "basic.auth.user.info": CONFLUENT_USERINFO}
registry = ConfluentSchemaRegistry(SchemaRegistryClient(sr_conf))

key_de = AvroDeserializer(client)
val_de = AvroDeserializer(client)

# Deserialize both key and value
msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
```

After:

```python doctest:SKIP
from bytewax.connectors.kafka import operators as kop
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

# Confluent's SchemaRegistryClient
client = SchemaRegistryClient(
{"url": CONFLUENT_URL, "basic.auth.user.info": CONFLUENT_USERINFO}
)
key_de = AvroDeserializer(client)
val_de = AvroDeserializer(client)

# Deserialize both key and value
msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
```


#### With Redpanda Schema Registry

If you are using Redpanda's schema registry or another setup for
which the serialized form does not use Confluent's wire format,
we provide compatible (de)serializer classes in
{py:obj}`bytewax.connectors.kafka.serde`.

Before:

```python doctest:SKIP
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.kafka.registry import RedpandaSchemaRegistry, SchemaRef

REDPANDA_REGISTRY_URL = "http://localhost:8080/schema-registry"

registry = RedpandaSchemaRegistry(REDPANDA_REGISTRY_URL)
key_de = registry.deserializer(SchemaRef("sensor-key"))
val_de = registry.deserializer(SchemaRef("sensor-value"))

msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
```

After:

```python doctest:SKIP
from bytewax.connectors.kafka import operators as kop
from confluent_kafka.schema_registry import SchemaRegistryClient
from bytewax.connectors.kafka.serde import PlainAvroDeserializer

REDPANDA_REGISTRY_URL = os.environ["REDPANDA_REGISTRY_URL"]
# Redpanda's schema registry configuration
client = SchemaRegistryClient({"url": REDPANDA_REGISTRY_URL})

# Use plain avro instead of confluent's wire format.
# We need to specify the schema in the deserializer too here.
key_schema = client.get_latest_version("sensor-key").schema
key_de = PlainAvroDeserializer(schema=key_schema)

val_schema = client.get_latest_version("sensor-value").schema
val_de = PlainAvroDeserializer(schema=val_schema)

# Deserialize both key and value
msgs = kop.deserialize("de", kinp.oks, key_deserializer=key_de, val_deserializer=val_de)
```


## From v0.17 to v0.18

### Non-Linear Dataflows
Expand Down
7 changes: 5 additions & 2 deletions examples/wikistream.py
Expand Up @@ -10,7 +10,7 @@
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
from bytewax.operators.window import SystemClockConfig, TumblingWindow
from bytewax.operators.window import SystemClockConfig, TumblingWindow, WindowMetadata


async def _sse_agen(url):
Expand Down Expand Up @@ -62,7 +62,10 @@ def get_server_name(data_dict):
# ("server.name", count_per_window)


def keep_max(max_count: Optional[int], new_count: int) -> Tuple[Optional[int], int]:
def keep_max(
max_count: Optional[int], new_window_count: Tuple[WindowMetadata, int]
) -> Tuple[Optional[int], int]:
_metadata, new_count = new_window_count
if max_count is None:
new_max = new_count
else:
Expand Down

0 comments on commit 73d8e2f

Please sign in to comment.