# Using Online Maschine Learning on a StreamPipes data stream

In [1]:
from streampipes_client.client import StreamPipesClient
from streampipes_client.client.client_config import StreamPipesClientConfig
from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials

In [2]:
import os
os.environ["USER"] = "admin@streampipes.apache.org"
os.environ["API-KEY"] = "XXX"
os.environ["BROKER-HOST"] = "localhost"

In [3]:
client_config = StreamPipesClientConfig(
    credential_provider=StreamPipesApiKeyCredentials.from_env(username_env="USER", api_key_env="API-KEY"),
    host_address="localhost",
    port=8082,
    https_disabled=True
)
client = StreamPipesClient(client_config=client_config)

2023-01-24 09:30:03,771 - streampipes_client.client.client - [INFO] - [client.py:128] [_set_up_logging] - Logging successfully initialized with logging level INFO.


In [None]:
client.dataStreamApi.all().to_pandas()

## KMeans

In [None]:
from river import cluster, compose, preprocessing
from streampipes_client.functions.machine_learning.river_function import OnlineMachineLearning
from streampipes_client.functions.utils.function_config import FunctionId

k_means = compose.Pipeline(
    ("drop_features", compose.Discard("sensorId", "timestamp")),
    ("scale", preprocessing.StandardScaler()),
    ("k_means", cluster.KMeans(n_clusters=2))
)

clustering = OnlineMachineLearning(
    client=client, 
    function_id=FunctionId("org.examples.ClusteringFunction", 1), 
    stream_ids=["sp:spdatastream:xboBFK"],
    model=k_means,
)
clustering.start()

2023-01-20 10:44:33,514 - streampipes_client.endpoint.endpoint - [INFO] - [endpoint.py:165] [_make_request] - Successfully retrieved all resources.
2023-01-20 10:44:33,515 - streampipes_client.functions.function_handler - [INFO] - [function_handler.py:82] [initializeFunctions] - Using NatsBroker for <streampipes_client.functions.machine_learning.river_function.RiverFunction object at 0x00000158F7BAA0B0>


2023-01-20 10:44:33,532 - streampipes_client.functions.broker.nats_broker - [INFO] - [nats_broker.py:50] [_makeConnection] - Connected to NATS at localhost:4222
2023-01-20 10:44:33,532 - streampipes_client.functions.broker.nats_broker - [INFO] - [nats_broker.py:60] [_createSubscription] - Subscribed to stream: sp:spdatastream:xboBFK


In [None]:
clustering.set_learning(False)

In [None]:
clustering.stop()

2023-01-20 10:45:01,660 - streampipes_client.functions.broker.nats_broker - [INFO] - [nats_broker.py:70] [disconnect] - Stopped connection to stream: sp:spdatastream:xboBFK


## HoeffdingTreeRegressor

In [31]:
from IPython.display import clear_output
from river import cluster, compose, preprocessing, tree
from streampipes_client.functions.machine_learning.river_function import OnlineMachineLearning
from streampipes_client.functions.utils.function_config import FunctionId

hoeffding_tree = compose.Pipeline(
    ("drop_features", compose.Discard("sensorId", "timestamp")),
    ("hoeffding_tree", tree.HoeffdingTreeRegressor(grace_period=5))
)

def draw_tree(self, event, streamId):
    clear_output(wait=True)
    if self.learning:
        if self.model[1].n_nodes != None:
            self.model[1].draw().render("hoeffding_tree", format="png", cleanup=True)
            print("learning")
    else:
        print(event)
        print(f"truth:      {event.pop(self.target_label)}\nprediction: {self.model.predict_one(event)}")

regressor = OnlineMachineLearning(
    client=client, 
    function_id=FunctionId("org.examples.ClusteringFunction", 1), 
    stream_ids=["sp:spdatastream:xboBFK"],
    model=hoeffding_tree,
    supervised=True,
    target_label="temperature",
    on_event=draw_tree
)
regressor.start()




learning


In [None]:
regressor.set_learning(False)

{'mass_flow': 2.516627114228444, 'density': 45.24965930121421, 'volume_flow': 4.0280624308722395, 'sensor_fault_flags': False, 'temperature': 47.22745039785003, 'timestamp': 1674218139225, 'sensorId': 'flowrate02'}
truth:      47.22745039785003
prediction: 64.19019396258341


In [None]:
regressor.stop()

2023-01-20 13:35:39,544 - streampipes_client.functions.broker.nats_broker - [INFO] - [nats_broker.py:70] [disconnect] - Stopped connection to stream: sp:spdatastream:xboBFK


## DecisionTreeClassifier

In [8]:
from IPython.display import clear_output
from river import cluster, compose, preprocessing, tree
from streampipes_client.functions.machine_learning.river_function import OnlineMachineLearning
from streampipes_client.functions.utils.function_config import FunctionId

decision_tree = compose.Pipeline(
    ("drop_features", compose.Discard("sensorId", "timestamp")),
    ("decision_tree",tree.ExtremelyFastDecisionTreeClassifier(grace_period=5))
)

def draw_tree(self, event, streamId):
    clear_output(wait=True)
    if self.learning:
        if self.model[1].n_nodes != None:
            self.model[1].draw().render("decicion_tree", format="png", cleanup=True)
            print("learning")
    else:
        print(event)
        print(f"truth: {event.pop(self.target_label)}\nprediction: {self.model.predict_one(event)}")

classifier = OnlineMachineLearning(
    client=client, 
    function_id=FunctionId("org.examples.ClusteringFunction", 1), 
    stream_ids=["sp:spdatastream:xboBFK"],
    model=decision_tree,
    supervised=True,
    target_label="sensor_fault_flags",
    on_event=draw_tree
)
classifier.start()




learning


In [9]:
classifier.set_learning(False)

{'mass_flow': 0.21865829117248803, 'density': 40.80435751473563, 'volume_flow': 8.254557377626961, 'sensor_fault_flags': True, 'temperature': 96.69380238830203, 'timestamp': 1674217347264, 'sensorId': 'flowrate02'}
truth: True
prediction: True


In [10]:
classifier.stop()

2023-01-20 13:22:27,992 - streampipes_client.functions.broker.nats_broker - [INFO] - [nats_broker.py:70] [disconnect] - Stopped connection to stream: sp:spdatastream:xboBFK
