Getting an error below? <br>
urllib.error.URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129)>
Solution:
```
$ cd "/Applications/$(python3 --version | awk '{print $2}'| awk  -F. '{print "Python " $1"."$2}')"
$ sudo "./Install Certificates.command"
```

# Imports

In [None]:
# Base
import pandas as pd

# Data
import collections
from river import datasets

from river import optim
from river import linear_model
from river import imblearn
from river import anomaly
from river import neighbors
from river import facto
from river import naive_bayes
from river import tree
from river import ensemble
from river import metrics
from river import evaluate
from river import preprocessing

from streamz import Stream
from streamz.river import RiverTrain, RiverPredict

# Data

In [None]:
X_y = datasets.CreditCard()

counts = collections.Counter(y for _, y in X_y)

for c, count in counts.items():
    print(f'{c}: {count} ({count / sum(counts.values()):.5%})')


# Modify Classes

Anomaly Detectors do not have probabilities implemented therefore we use the anomaly score. Learning is supported only in unsupervised manner. Therefore, we add y=None to cover situations which may occur when trained in an ensemble

In [None]:
class HalfSpaceTrees(anomaly.HalfSpaceTrees):
  def learn_one(self, x, y=None):
    return anomaly.HalfSpaceTrees.learn_one(self, x)
  def predict_one(self, x, y=None):
    return anomaly.HalfSpaceTrees.score_one(self, x)
  def predict_proba_one(self, x, y=None):
    p = anomaly.HalfSpaceTrees.score_one(self, x)
    return {False: 1.0 - p, True: p}

In [None]:
class OneClassSVM(anomaly.OneClassSVM):
  def learn_one(self, x, y=None):
    return anomaly.OneClassSVM.learn_one(self, x)
  def predict_one(self, x, y=None):
    return anomaly.OneClassSVM.score_one(self, x)
  def predict_proba_one(self, x, y=None):
    p = anomaly.OneClassSVM.score_one(self, x)
    return {False: 1.0 - p, True: p}

In [None]:
class VotingClassifier(ensemble.VotingClassifier):
  def predict_proba_one(self, x):
        if self.use_probabilities:
            votes = (model.predict_proba_one(x) for model in self)
        else:
            votes = ({model.predict_one(x): 1} for model in self)
        agg = collections.Counter()
        for vote in votes:
            agg.update(vote)
        return {k: v / len(self.models) for k, v in dict(agg.most_common(2)).items()}

In [None]:
models = [
  (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
        classifier=linear_model.LogisticRegression(
            loss=optim.losses.Log(weight_pos=5)
        ),
        desired_dist={0: .8, 1: .2},
        seed=42
    )
  ),
  (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
      classifier=VotingClassifier([
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        #neighbors.KNNClassifier(),
        #HalfSpaceTrees(),
        #anomaly.OneClassSVM(nu=0.2),
        #facto.FFMClassifier(n_factors=10,intercept=.5,seed=42,),
        #tree.HoeffdingTreeClassifier(),
        #naive_bayes.GaussianNB()
      ]),
    desired_dist={0: .8, 1: .2},
    seed=42
    )
  )
      ]

# Validate the performance of Voting Algorithm
All three cels shall provide the saeme results

In [None]:
model = (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
        classifier=linear_model.LogisticRegression(
            loss=optim.losses.Log(weight_pos=5)
        ),
        desired_dist={0: .8, 1: .2},
        seed=42
    )
)

metric = metrics.ROCAUC()

evaluate.progressive_val_score(X_y, model, metric)

In [None]:
model = (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
      VotingClassifier([
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
      ]),
    desired_dist={0: .8, 1: .2},
    seed=42
    )
)

metric = metrics.ROCAUC()

evaluate.progressive_val_score(X_y, model, metric)


In [None]:
model = (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
      VotingClassifier([
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
      ]),
    desired_dist={0: .8, 1: .2},
    seed=42
    )
)
metric = metrics.ROCAUC()

# Evaluate the model on the training data
# Alt. 1
# evaluate.progressive_val_score(X_y, model, metric)
# Alt. 2
for x, y in X_y:
    y_pred = model.predict_proba_one(x)
    metric = metric.update(y, y_pred)
    model = model.learn_one(x, y)
    
metric

# Streamz Anomaly Detection

In [None]:
from river import anomaly
from streamz import Stream
from streamz.river import RiverTrain, RiverPredict

### Stream

Update RiverTrain and RiverPredict to work with supervised models

In [None]:
class RiverTrain(Stream):

    def __init__(self, model, metric=None, pass_model=False, **kwargs):
        """

        If metric and pass_model are both defaults, this is effectively
        a sink.

        :param model: river model or pipeline
        :param metric: river metric
            If given, it is emitted on every sample
        :param pass_model: bool
            If True, the (updated) model if emitted for each sample
        """
        super().__init__(**kwargs)
        self.model = model
        if pass_model and metric is not None:
            raise TypeError
        self.pass_model = pass_model
        self.metric = metric

    def update(self, x, who=None, metadata=None):
        """
        :param x: tuple
            (x, [y[, w]) floats for single sample. Include
        """
        
        if self.metric:
            if len(x) > 1:
                yp = self.model.predict_proba_one(x[0])
                weights = x[2] if len(x) > 2 else 1.0
                self.metric.update(x[1], yp, weights)
            else:
                raise RuntimeError("Metric specified but ground truth not defined. Input touple possibly (x,) not (x,y).")
            
        self.model.learn_one(*x)
        
        if self.metric:
            return self._emit(self.metric.get(), metadata=metadata)
        if self.pass_model:
            return self._emit(self.model, metadata=metadata)
        
        
class RiverPredict(Stream):

    def __init__(self, model, **kwargs):
        super().__init__(**kwargs)
        self.model = model

    def update(self, x, who=None, metadata=None):
        out = self.model.predict_one(x[0])
        return self._emit(out, metadata=metadata)

In [None]:
from river import preprocessing, imblearn, linear_model, optim

model = (
    preprocessing.StandardScaler() |
    imblearn.RandomUnderSampler(
      VotingClassifier([
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
      ]),
    desired_dist={0: .8, 1: .2},
    seed=42
    )
)
metric = metrics.ROCAUC()

l = []
s = Stream()
# Stream pipeline for model's training
s_train = RiverTrain(model, metric=metric)
# Process training data for model's training
s.map(lambda x: (x,) if not isinstance(x, tuple) else x).connect(s_train)
# Example DataFrame
ex = pd.DataFrame({'x': [0.5], 'y': [0.5]})
# Stream pipeline for model's prediction
s_pred = RiverPredict(model)

t = s.connect(s_pred)
#p = s_pred.sink(print)

r = s_train.sink(l.append)

In [None]:
s.visualize()

### Evaluate

In [None]:
for x, y in X_y:
    s.emit((x,y))
metric

# Unsupervised learning

In [None]:
class OneClassSVM(anomaly.OneClassSVM):
  def learn_one(self, x, y=None):
    return anomaly.OneClassSVM.learn_one(self, x)
  def predict_one(self, x, y=None):
    return anomaly.OneClassSVM.score_one(self, x)
  def predict_proba_one(self, x, y=None):
    p = anomaly.OneClassSVM.score_one(self, x)
    return {False: 1.0 - p, True: p}
  
model = OneClassSVM()

In [None]:
from river import preprocessing, imblearn, linear_model, optim

model = (
    preprocessing.StandardScaler() |
      imblearn.RandomUnderSampler(VotingClassifier([
        #linear_model.LogisticRegression(loss=optim.losses.Log(weight_pos=5)),
        HalfSpaceTrees(),
        OneClassSVM()
      ]),desired_dist={0: .8, 1: .2},
    seed=42
)
)

metric = metrics.ROCAUC()

s = Stream()
# Stream pipeline for model's training
s_train = RiverTrain(model, metric=metric)
# Process training data for model's training
s.map(lambda x: (x,) if not isinstance(x, tuple) else x).connect(s_train)
# Stream pipeline for model's prediction
s_pred = RiverPredict(model)

t = s.connect(s_pred)
#p = s_pred.sink(print)

r = s_train.sink(l.append)

In [None]:
s.visualize()

In [None]:
for x, y in X_y:
    s.emit((x, y))
metric

# Coffee Machine

In [None]:
from coffee_queries import query_coffee_history, process_coffee

In [None]:
broker = "mqtt.cloud.uiam.sk"
port = 1883
topics = ["shellies/Shelly_Kitchen-C_CoffeMachine/relay/0/power"]

In [None]:
coffee_history = query_coffee_history()

In [None]:
class ThresholdFilter(anomaly.ThresholdFilter):
  def predict_one(self, x):
    return anomaly.ThresholdFilter.classify(self, x['power'])

In [None]:
from river import preprocessing, imblearn, linear_model, optim

model = (
    ThresholdFilter(
        OneClassSVM(),threshold=200
        )
)

l = []

# Streamed data source
s = Stream.from_mqtt(broker, port, topics[0])
# Process training data for model's training
proc = s.map(process_coffee).map(lambda x: (x,) if not isinstance(x, tuple) else x)
# Stream pipeline for model's training
s_train = RiverTrain(model, pass_model=True)
proc.connect(s_train)
r = s_train.sink(l.append)
# Stream pipeline for model's validation
s_pred = RiverPredict(model)
proc.connect(s_pred)
s_pred.sink(print)

In [None]:
for x in coffee_history:
    s.emit(x)

In [None]:
s.start()

In [None]:
s.stop()

In [None]:
s.visualize()

# Drift Detection
Can be applied on scalar values only

In [None]:
from river import drift

adwin = drift.ADWIN()

# Update drift detector and verify if change is detected
for i, val in X_y:
    _ = adwin.update(list(i.values())[1])
    if adwin.drift_detected:
        print(f"Change detected at index {i}, input value: {val}")

## Plot Credit Card dataset

In [None]:
import numpy as np
import pandas as pd
import plotly.express as px


In [None]:
X_y = datasets.CreditCard()

counts = collections.Counter(y for _, y in X_y)

for c, count in counts.items():
    print(f'{c}: {count} ({count / sum(counts.values()):.5%})')


In [None]:
df = pd.DataFrame.from_dict(list(X_y.take(X_y.n_samples)))

In [None]:
df = df.join(pd.DataFrame(df[0].values.tolist())).drop(columns=0)

In [None]:
df.sample(1000)[1].unique()

In [None]:
df[df['Time'] == 102572.0]

In [None]:
px.scatter(df.sample(10000))

In [None]:
model = anomaly.QuantileFilter(
    anomaly.OneClassSVM(nu=0.002),
    q=0.995
)

auc = metrics.ROCAUC()

anomalies = []
for i, (x, y) in enumerate(datasets.CreditCard().take(160000)):
    if i > 100000:
        score = model.score_one(x)
        is_anomaly = model.classify(score)
        model = model.learn_one(x)
        auc = auc.update(y, is_anomaly)
        
        anomalies.append(is_anomaly)

auc

In [None]:
from plotly import graph_objects as go

In [None]:
df[anomalies]

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=df.index, y=df['V12'],
    line_color='rgb(0,140,120)',
    name='power [W]',
))

for anomaly_ in anomalies:
  fig.add_vline(anomaly_, 
                line_color='rgba(100,0,0,0.5)', line_dash="dash")

fig.update_layout(
    xaxis_title="Date and Time",
    showlegend=True)

fig.show()

In [None]:
def collect(x):
    return x
def get_features(x):
    return x
def get_truth(x):
    return x
def preprocess(x):
    return x
def predict(x):
    return x
def learn(x):
    return x
def evaluate(x):
    return x
def store(x):
    return x

In [None]:
s = Stream()
s.map(collect).map(preprocess).sink(print)