In [1]:
import river
import river.datasets as datasets
from river import stream
from pprint import pprint
import os
from scipy.io import arff
import pandas as pd
from river import tree
from river import evaluate
from river import metrics
from river.cluster.clustream import CluStream
from river.cluster.clustream import CluStreamMicroCluster
import math
from collections import defaultdict
from collections import Counter
import typing

In [2]:
class CluStreamMicroClusterWithLabel(CluStreamMicroCluster):
    def __init__(   self,   x: dict = defaultdict(float),
        labels: dict = {},
        w: float = None,
        timestamp: int = None ):
            super().__init__(x,w,timestamp)
            self.labels = labels
                
    def _add_label(self,y):
        self.labels[y] = self.labels.get(y,0)+1

    def insert(self, x,y, w, timestamp):
        self.var_time.update(timestamp, w)
        for x_idx, x_val in x.items():
            self.var_x[x_idx].update(x_val, w)
        self._add_label(y)

In [12]:

class CluserAndLabel(CluStream):
    """ 
    Extension of CluStream class
        - instead of CluStreamMicroCluster a CluStreamMicroClusterWithLabel used
        - modified  _maintain_micro_clusters, predict_one, learn_one
        - macroclusters not used

    """

    def __init__(self,
                 n_macro_clusters: int = 5,
                 max_micro_clusters: int = 100,
                 micro_cluster_r_factor: int = 2,
                 time_window: int = 1000,
                 time_gap: int = 100,
                 seed: int = None,
                 **kwargs, ):
        super().__init__(n_macro_clusters, max_micro_clusters,
                         micro_cluster_r_factor, time_window, time_gap, seed, **kwargs)
        self.micro_clusters: typing.Dict[int,
                                         CluStreamMicroClusterWithLabel] = {}

    def _merge_clusters_label_count(self, labels1, labels2):
        """
        When two clusters are merged their labels dictionary also need to be merged
        and if they have the same keys, the value need to be summed
        """
        cnt1 = Counter(labels1)
        cnt2 = Counter(labels2)
        return dict(cnt1 + cnt2)

    def _maintain_micro_clusters(self, x, w, y):
        """
        Diffrence in merging introduced
        """
        # Calculate the threshold to delete old micro-clusters
        threshold = self._timestamp - self.time_window

        # Delete old micro-cluster if its relevance stamp is smaller than the threshold
        del_id = None
        for i, mc in self.micro_clusters.items():
            if mc.relevance_stamp(self.max_micro_clusters) < threshold:
                print(i)
                del_id = i
                break

        if del_id is not None:
            self.micro_clusters[del_id] = CluStreamMicroClusterWithLabel(
                x=x,
                w=w,
                labels={y: 1},
                timestamp=self._timestamp,
            )
            return

        # Merge the two closest micro-clusters
        closest_a = 0
        closest_b = 0
        min_distance = math.inf
        for i, mc_a in self.micro_clusters.items():
            for j, mc_b in self.micro_clusters.items():
                if i <= j:
                    continue
                dist = self._distance(mc_a.center, mc_b.center)
                if dist < min_distance:
                    min_distance = dist
                    closest_a = i
                    closest_b = j

        # diffrent merging - also labels count needs to be added
        labels_merged = self._merge_clusters_label_count(self.micro_clusters[closest_a].labels,
                                                         self.micro_clusters[closest_b].labels)
        self.micro_clusters[closest_a] += self.micro_clusters[closest_b]
        self.micro_clusters[closest_a].labels = labels_merged
        self.micro_clusters[closest_b] = CluStreamMicroClusterWithLabel(
            x=x,
            w=w,
            labels={y: 1},
            timestamp=self._timestamp,
        )

    def return_microclusters(self):
        """ Method for printing the labels statistics in each microcluster"""
        for i, mc in self.micro_clusters.items():
            print(i,mc.labels)
    def sum_labels(self):
        """ Method for summing the number of stored labels -- needed for testing"""
        s = 0
        for i, mc in self.micro_clusters.items():
            s+=sum([v for v in mc.labels.values()])
        return s

    def predict_one(self, x):
        """ Prediction yields the majority class in a microcluser x belongs to """
        cluster_num = self._get_closest_mc(x)[0]
        labels = self.micro_clusters[cluster_num].labels
        return max(labels, key=labels.get)

    def learn_one(self, x, y=None, w=1):
        """ Learns y as well (orginal algorithm was an unsupervised one).
        If y not avaiable it predicts a label and assigns it as a pseudolabel
        The macroclusters are not used"""
        if y is None:
            y = self.predict_one(x)

        self._timestamp += 1

        if not self._initialized:
            self.micro_clusters[len(self.micro_clusters)] = CluStreamMicroClusterWithLabel(
                x=x,
                w=w,
                labels={y: 1},
                # When initialized, all micro clusters generated previously will have the timestamp reset to the current
                # time stamp at the time of initialization (i.e. self.max_micro_cluster - 1). Thus, the timestamp is set
                # as follows.
                timestamp=self.max_micro_clusters - 1,
            )

            if len(self.micro_clusters) == self.max_micro_clusters:
                self._initialized = True

            return self

            # Determine the closest micro-cluster with respect to the new point instance
        closest_id, closest_dist = self._get_closest_mc(x)
        closest_mc = self.micro_clusters[closest_id]

        # Check whether the new instance fits into the closest micro-cluster
        if closest_mc.weight == 1:
            radius = math.inf
            center = closest_mc.center
            for mc_id, mc in self.micro_clusters.items():
                if mc_id == closest_id:
                    continue
                distance = self._distance(mc.center, center)
                radius = min(distance, radius)
        else:
            radius = closest_mc.radius(self.micro_cluster_r_factor)

        if closest_dist < radius:
            closest_mc.insert(x, w, y, self._timestamp)
            return self

        # If the new point does not fit in the micro-cluster, micro-clusters
        # whose relevance stamps are less than the threshold are deleted.
        # Otherwise, closest micro-clusters are merged with each other.
        self._maintain_micro_clusters(x=x, w=w, y=y)
        return self


In [4]:
clustream.return_microclusters()

NameError: name 'clustream' is not defined

In [13]:
from river import cluster
from river import stream
# X = [ [1, 2],[1, 4],[1, 0],[-4, 0],[-4, 4]]

# Y = [1,1,2,3,1]
clustream = CluserAndLabel(
         max_micro_clusters=7,
         time_gap=3,
         seed=0,
         halflife=0.4 )
led_stream = datasets.synth.LED(123)
for i,(x,y) in enumerate(led_stream.take(1000)):
    if i <990:
        # if y ==7:
        #     print(clustream.return_microclusters() )
        if i %10 == 9:
            clustream = clustream.learn_one(x)
        else:
            clustream = clustream.learn_one(x = x,y = y)
        #print(clustream.sum_labels())

    else:
        print(x,clustream.predict_one(x),y)

clustream.return_microclusters() 

{0: 1, 1: 0, 2: 1, 3: 1, 4: 1, 5: 0, 6: 1} 2 2
{0: 1, 1: 1, 2: 0, 3: 1, 4: 1, 5: 1, 6: 1} 1 6
{0: 1, 1: 1, 2: 0, 3: 1, 4: 0, 5: 1, 6: 1} 1 5
{0: 1, 1: 0, 2: 1, 3: 1, 4: 0, 5: 1, 6: 1} 3 3
{0: 1, 1: 1, 2: 0, 3: 1, 4: 1, 5: 1, 6: 1} 1 6
{0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1} 1 8
{0: 1, 1: 0, 2: 1, 3: 1, 4: 1, 5: 0, 6: 1} 2 2
{0: 1, 1: 1, 2: 0, 3: 1, 4: 0, 5: 1, 6: 1} 1 5
{0: 1, 1: 1, 2: 1, 3: 1, 4: 0, 5: 1, 6: 1} 1 9
{0: 1, 1: 1, 2: 1, 3: 0, 4: 1, 5: 1, 6: 1} 1 0
0 {2: 1}
1 {3: 98, 1: 1}
2 {1: 197, 7: 1}
3 {4: 100, 1: 1}
4 {2: 100, 1: 1}
5 {6: 66, 1: 133, 5: 8, 9: 82}
6 {8: 5, 0: 79, 1: 117}


In [13]:
print(clustream.sum_labels())

40


In [None]:
stop

In [19]:
i

26

In [20]:
fligths = [x for x,y in stream.iter_arff('../datasets/airlines.arff.zip', compression='infer')]

In [21]:
f = pd.DataFrame.from_records(fligths)

In [22]:
for i, x in enumerate(stream.iter_csv('../datasets/covtype.data.gz', compression='infer')):
    pprint(len(x))


2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2


: 

: 

In [None]:
LED = datasets.synth.LEDDrift(123,0.8,False,33)


In [None]:
led_stream = datasets.synth.LEDDrift(123,0,True,7).take(1000)

In [None]:
model = tree.HoeffdingTreeClassifier(    grace_period=100,    delta=1e-5)

In [None]:
metric = metrics.Accuracy()
evaluate.progressive_val_score(led_stream,model,metric)

Accuracy: 10.71%

In [None]:

import math
import typing
from collections import defaultdict

from river import base, cluster, stats, utils


class CluStream(base.Clusterer):

    def __init__(
        self,
        n_macro_clusters: int = 5,
        max_micro_clusters: int = 100,
        micro_cluster_r_factor: int = 2,
        time_window: int = 1000,
        time_gap: int = 100,
        seed: int = None,
        **kwargs,
    ):
        super().__init__()
        self.n_macro_clusters = n_macro_clusters
        self.max_micro_clusters = max_micro_clusters
        self.micro_cluster_r_factor = micro_cluster_r_factor
        self.time_window = time_window
        self.time_gap = time_gap
        self.seed = seed

        self.kwargs = kwargs

        self.centers: typing.Dict[int, typing.DefaultDict] = {}
        self.micro_clusters: typing.Dict[int, CluStreamMicroCluster] = {}

        self._timestamp = -1
        self._initialized = False

        self._mc_centers: typing.Dict[int, typing.DefaultDict] = {}
        self._kmeans_mc = None

    def _maintain_micro_clusters(self, x, w):
        # Calculate the threshold to delete old micro-clusters
        threshold = self._timestamp - self.time_window

        # Delete old micro-cluster if its relevance stamp is smaller than the threshold
        del_id = None
        for i, mc in self.micro_clusters.items():
            if mc.relevance_stamp(self.max_micro_clusters) < threshold:
                del_id = i
                break

        if del_id is not None:
            self.micro_clusters[del_id] = CluStreamMicroCluster(
                x=x,
                w=w,
                timestamp=self._timestamp,
            )
            return

        # Merge the two closest micro-clusters
        closest_a = 0
        closest_b = 0
        min_distance = math.inf
        for i, mc_a in self.micro_clusters.items():
            for j, mc_b in self.micro_clusters.items():
                if i <= j:
                    continue
                dist = self._distance(mc_a.center, mc_b.center)
                if dist < min_distance:
                    min_distance = dist
                    closest_a = i
                    closest_b = j

        self.micro_clusters[closest_a] += self.micro_clusters[closest_b]
        self.micro_clusters[closest_b] = CluStreamMicroCluster(
            x=x,
            w=w,
            timestamp=self._timestamp,
        )

    def _get_closest_mc(self, x):
        closest_dist = math.inf
        closest_idx = -1

        for mc_idx, mc in self.micro_clusters.items():
            distance = self._distance(mc.center, x)
            if distance < closest_dist:
                closest_dist = distance
                closest_idx = mc_idx
        return closest_idx, closest_dist

    @staticmethod
    def _distance(point_a, point_b):
        return utils.math.minkowski_distance(point_a, point_b, 2)

    def learn_one(self, x, w=1.0):
        self._timestamp += 1

        if not self._initialized:
            self.micro_clusters[len(self.micro_clusters)] = CluStreamMicroCluster(
                x=x,
                w=w,
                # When initialized, all micro clusters generated previously will have the timestamp reset to the current
                # time stamp at the time of initialization (i.e. self.max_micro_cluster - 1). Thus, the timestamp is set
                # as follows.
                timestamp=self.max_micro_clusters - 1,
            )

            if len(self.micro_clusters) == self.max_micro_clusters:
                self._initialized = True

            return self

        # Determine the closest micro-cluster with respect to the new point instance
        closest_id, closest_dist = self._get_closest_mc(x)
        closest_mc = self.micro_clusters[closest_id]

        # Check whether the new instance fits into the closest micro-cluster
        if closest_mc.weight == 1:
            radius = math.inf
            center = closest_mc.center
            for mc_id, mc in self.micro_clusters.items():
                if mc_id == closest_id:
                    continue
                distance = self._distance(mc.center, center)
                radius = min(distance, radius)
        else:
            radius = closest_mc.radius(self.micro_cluster_r_factor)

        if closest_dist < radius:
            closest_mc.insert(x, w, self._timestamp)
            return self

        # If the new point does not fit in the micro-cluster, micro-clusters
        # whose relevance stamps are less than the threshold are deleted.
        # Otherwise, closest micro-clusters are merged with each other.
        self._maintain_micro_clusters(x=x, w=w)

        # Apply incremental K-Means on micro-clusters after each time_gap
        if self._timestamp % self.time_gap == self.time_gap - 1:
            # Micro-cluster centers will only be saved when the calculation of macro-cluster centers
            # is required, in order not to take up memory and time unnecessarily
            self._mc_centers = {i: mc.center for i, mc in self.micro_clusters.items()}

            self._kmeans_mc = cluster.KMeans(
                n_clusters=self.n_macro_clusters, seed=self.seed, **self.kwargs
            )
            for center in self._mc_centers.values():
                self._kmeans_mc = self._kmeans_mc.learn_one(center)

            self.centers = self._kmeans_mc.centers

        return self

    def predict_one(self, x):
        index, _ = self._get_closest_mc(x)
        try:
            return self._kmeans_mc.predict_one(self._mc_centers[index])
        except (KeyError, AttributeError):
            return 0


class CluStreamMicroCluster(base.Base):
    """Micro-cluster class."""

    def __init__(
        self,
        x: dict = defaultdict(float),
        w: float = None,
        timestamp: int = None,
    ):
        # Initialize with sample x
        self.x = x
        self.w = w
        self.timestamp = timestamp
        self.var_x = {k: stats.Var().update(x[k], w) for k in x}
        self.var_time = stats.Var().update(timestamp, w)

    @property
    def center(self):
        return {k: var.mean.get() for k, var in self.var_x.items()}

    def radius(self, r_factor):
        if self.weight == 1:
            return 0
        return self._deviation() * r_factor

    def _deviation(self):
        dev_sum = 0
        for var in self.var_x.values():
            dev_sum += math.sqrt(var.get())
        return dev_sum / len(self.var_x) if len(self.var_x) > 0 else math.inf

    @property
    def weight(self):
        return self.var_time.n

    def insert(self, x, w, timestamp):
        self.var_time.update(timestamp, w)
        for x_idx, x_val in x.items():
            self.var_x[x_idx].update(x_val, w)

    def relevance_stamp(self, max_mc):
        mu_time = self.var_time.mean.get()
        if self.weight < 2 * max_mc:
            return mu_time

        sigma_time = math.sqrt(self.var_time.get())
        return mu_time + sigma_time * self._quantile(max_mc / (2 * self.weight))

    def _quantile(self, z):
        return math.sqrt(2) * self.inverse_error(2 * z - 1)

    @staticmethod
    def inverse_error(x):
        z = math.sqrt(math.pi) * x
        res = x / 2
        z2 = z * z

        zprod = z2 * z
        res += (1.0 / 24) * zprod

        zprod *= z2  # z5
        res += (7.0 / 960) * zprod

        zprod *= z2  # z ^ 7
        res += (127 * zprod) / 80640

        zprod *= z2  # z ^ 9
        res += (4369 * zprod) / 11612160

        zprod *= z2  # z ^ 11
        res += (34807 * zprod) / 364953600

        zprod *= z2  # z ^ 13
        res += (20036983 * zprod) / 797058662400

        return res

    def __iadd__(self, other: "CluStreamMicroCluster"):
        self.var_time += other.var_time
        self.var_x = {k: self.var_x[k] + other.var_x.get(k, stats.Var()) for k in self.var_x}
        return self
Footer
© 2023 GitHub, Inc.
Footer navigation
Terms
Privacy
Security
Status
Docs
Contact GitHub
Pricing
API
Training
Blog
About
