Skip to content

Prototype which extracts stateful dataflows by analysing Python code.

Notifications You must be signed in to change notification settings

delftdata/stateflow

Repository files navigation

StateFlow | Object Oriented Code to Distributed Stateful Dataflows

CI codecov Python 3.8

StateFlow is a framework which compiles object oriented Python code to distributed stateful dataflows. These dataflows can be executed on different target systems. At the moment, we support the following runtime systems:

Runtime Local execution Cluster execution Notes
PyFlink -
Stateflow (Universalis) -
Apache Beam Beam suffers a bug with Kafka, which can be bypassed locally. Deployment in a Dataflow runner does not work.
Flink Statefun -
AWS Lambda -
CloudBurst CloudBurst has never been officially released. Due to missing Docker files and documentation, execution does not work.

An evaluation of StateFlow can be found at delftdata/stateflow-evaluation.

The stateflow runtime system can be found at delftdata/stateflow-runtime.

Features

  • Analysis and transformation of Python classes to distributed stateful dataflows. These dataflows can be ported to cloud services and dataflow systems.
  • Due to the nature of dataflow systems, stateful entities cannot directly interact with each other. Therefore, direct calls to other objects, as done in object-oriented code, does not work in stateful dataflows. StateFlow splits such functions at the AST level to get rid of the remote call. Instead, StateFlow splits a function into several parts such that the dataflow system can move back and forth between the different stateful entities (e.g. dataflow operators).
  • Support for compilation to several (cloud) services including: AWS Lambda, Apache Beam, Flink Statefun and PyFlink.
  • Support for several client-side connectivity services including: Apache Kafka, AWS Kinesis, AWS Gateway. Depending on the runtime system, a compatible client has to be used. A developer can either use StateFlow futures or asyncio to interact with the remote stateful entities.
  • Integration with FastAPI: each class function will be covered by an HTTP endpoint. Developers can easily add their own.

Walkthrough

To work with StateFlow, a developer annotates its classes with the @stateflow decorator.

from typing import List
import stateflow

@stateflow.stateflow
class Item:
    def __init__(self, item_name: str, price: int):
        self.item_name: str = item_name
        self.stock: int = 0
        self.price: int = price
        
    def set_stock(self, amount: int):
        self.stock = amount

    def __key__(self):
        return self.item_name

   
@stateflow.stateflow
class User:
    def __init__(self, username: str):
        self.username: str = username
        self.balance: int = 1

    def update_balance(self, x: int):
        self.balance += x

    def buy_item(self, amount: int, item: Item) -> bool:
        total_price = amount * item.price
   
        if self.balance < total_price:
            return False
   
        # Decrease the stock.
        decrease_stock = item.update_stock(-amount)
   
        if not decrease_stock:
            return False  # For some reason, stock couldn't be decreased.
   
        self.balance -= total_price
        return True
    
    def __key__(self):
        return self.username

Each stateful entities has to implement the key method to define the static partitioning key. This key cannot change during execution and ensures the entity is addressable in the distributed runtime. Think of this key as the primary key in databases.

To deploy this to, for example, as a Flink job simply import the annotated classes and initialize stateflow.

from demo_common import User, Item, stateflow
from stateflow.runtime.flink.pyflink import FlinkRuntime, Runtime

# Initialize stateflow
flow = stateflow.init()

runtime: FlinkRuntime = FlinkRuntime(flow)
runtime.run()

This code will generate a streaming dataflow graph compatible with Apache Flink. Finally, to interact with these stateful entities:

from demo_common import User, Item, stateflow
from stateflow.client.kafka_client import StateflowKafkaClient, StateflowClient, StateflowFuture

# Initialize stateflow
flow = stateflow.init()
client: StateflowClient = StateflowKafkaClient(
    flow, brokers="localhost:9092", statefun_mode=False
)

future_user: StateflowFuture[User] = User("new-user")
user: User = future_user.get()

user.update_balance(10).get()

Demo

To run a (full) demo:

  1. Launch a Kafka cluster
    cd deployment
    docker-compose up
    
  2. Run demo_client.py, this will start a client being able to interact with stateful entities. This will also create the appropriate Kafka topics client_request, client_reply, internal.
  3. Run demo_runtime.py, this will deploy the stateful dataflow on Apache Beam. The stateful entities are defined in demo_common.py.

Demo (with FastAPI)

  1. Launch a Kafka cluster
    cd deployment
    docker-compose up
    
  2. Run uvicorn fastapi_client:app, this will start a FastAPI client on http://localhost:8000 being able to interact with stateful entities using Kafka. To find all (generated) endpoints visit http://localhost:8000/docs. New endpoints can be added in fastapi_client.py.
  3. Run demo_runtime.py, this will deploy the stateful dataflow on Apache Beam. The stateful entities are defined in demo_common.py.

Credits

This repository is part of the research conducted at the Delft Data Management Lab.
Contributors:

Releases

No releases published

Packages

No packages published

Languages