Skip to content

Commit

Permalink
📝 Update example (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmbmb committed Apr 11, 2023
1 parent 6ad0d92 commit 9e12c94
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,18 @@ pip install kaflow
## Example

```python
from kaflow import Json, Kaflow
from kaflow import (
FromHeader,
FromKey,
FromValue,
Json,
Kaflow,
Message,
MessageOffset,
MessagePartition,
MessageTimestamp,
String,
)
from pydantic import BaseModel


Expand All @@ -49,14 +60,30 @@ class UserClick(BaseModel):
timestamp: int


class Key(BaseModel):
environment: str


app = Kaflow(name="AwesomeKakfaApp", brokers="localhost:9092")


@app.consume(topic="user_clicks", sink_topics=("user_clicks_json",))
async def consume_user_clicks(message: Json[UserClick]) -> Json[UserClick]:
print("user click", message)
return message
@app.consume(topic="user_clicks", sink_topics=["user_clicks_json"])
async def consume_user_clicks(
message: FromValue[Json[UserClick]],
key: FromKey[Json[Key]],
x_correlation_id: FromHeader[String[str]],
x_request_id: FromHeader[String[str]],
partition: MessagePartition,
offset: MessageOffset,
timestamp: MessageTimestamp,
) -> Message:
# Do something with the message
...

# Publish to another topic
return Message(value=b'{"user_clicked": "true"}')


app.run()

```

0 comments on commit 9e12c94

Please sign in to comment.