# Statistical Machine Learning on 
# Evolving Non-stationary Data Streams

## Intuition, Formalism and Examples

In [None]:
jupyter nbconvert StreamingML.ipynb --to slides --post serve


## Stream Processing

Given a sequence of data (a stream), a series of operations (functions) is applied to each element in the stream, in a declarative way, we specify what we want to achieve and not how [Bifet, 2010].
![](streaming-intuition.png)


## Stream processing

Data __Stream Learning__ is more challenging than __batch or offline learning__ [Bifet, 2010]:
* The amount of data is __extremely large__, potentially infinite - __impossible to store__ it all. 
* Only a __small summary__ can be computed and stored, and the rest is discarded - unfeasible to go over it for processing.
* The __speed of arrival is high__, so that each particular element has to be processed in __real time__, and then discarded.
* The __distribution generating the items__ can __change over time__. 
* __Data from the past__ may become __irrelevant (or even harmful)__for the current summary.

## Stream processing

![](ml-pipeline.png)

## Beyond i.i.d

* __Traditional machine learning and data mining__ assume the current observed data and the future data are assumed to be i.i.d
* __Data samples__, past and current data samples do not affect the probability for future ones. 

* Applications like:
    * web mining, 
    * social networks, 
    * network monitoring, 
    * sensor networks, 
    * telecommunications, 
    * financial forecasting, etc.
* Data samples arrive __continuously__, __online__ through unlimited streams often at __high speed, over time__
* The __process__ generating these data streams __may evolve over time__ (evolving, nonstationarity).

## Beyond i.i.d

In order to deal with evolving data streams, the model learnt from the streaming data must capture up-to-date trends and transient patterns in the stream. 

Updating the model by incorporating new examples, we must also eliminate the effects of outdated examples representing outdated concepts through one-pass.

Types of change
![](changes-types.png)

## Beyond i.i.d

In order to deal with evolving data streams, the model learnt from the streaming data must capture up-to-date trends and transient patterns in the stream. 

Typical changes in classification
![](changes-sample.png)

## Beyond i.i.d

When __training and test data__ follow __different probability distributions__, but the __conditional distributions of output__ values given input points are __unchanged__, is called the __covariate shift__ [Sugiyama et al., 2012]. The __target function__ is __unchanged__, but the __distributions__ are __different__ between the training and testing.

![](covariate-shift.png)

## Beyond i.i.d

The key idea of __covariate shift adaptation__ is to (softly) __choose informative training samples__ in a systematic way, by considering the __importance__ of each training sample in the prediction of test output values, namely the ratio

$$ \frac{p_{te}(x_i^{tr})}{p_{tr}(x_i^{tr})}$$

Basically, __the expectation__ of a function $f$ (i.e. regression) over $x_{te}$ can be computed by the __importance-weighted expectation__ of the function over $x_{tr}$. Thus, the __difference of distributions__ can be systematically __adjusted__ by importance weighting.

## Concept drift

Data generated by phenomena in __nonstationary environments__ are characterized by: 

* potentially unlimited size;
* sequential access to data samples (i.e. once an observation has been processed, it cannot be retrieved);
* unpredictable, dependent, and not identical distributed observations.

__Learning from streams of nonstationary data__ lives under the following constraints[Sayed-Mouchaweh, 2016]:
* __Random access to observations__ is __not feasible__, or it has high costs (i.e. dataset not a priori available or too large).
* __Memory is small__ with respect to the size of data.
* Data __distribution__ generating the data may __evolve over time__. This is also known as __concept drift__.

## Incremental learning

Unlike __conventional machine learning__, the __data flow__ targeted by __incremental learning__ becomes available __continuously__ over time and needs to be __processed in a single pass__.

The inherent __challenges__ here are: 

* __data availability__ 
* __model update__ 
* __data size__ 

## Incremental learning

The inherent __challenges__ here are: 

* __data availability__ - new data is received and eliminated from the window of interest as the stream evolves in time
* __model update__ - given the dynamics of the window evolution, the updates must be performed in a single pass
* __data size__ - precise models need large windows, yet updating the model for the entire window is costly in terms of latency and resource allocation (i.e. memory / disk).

## Incremental dimensionality reduction (PCA)

In a __structured form__, the basic formulation, PCA follows the following steps:

![](basic-pca.png)

## Incremental dimensionality reduction (PCA)

![](basic-pca-problems.png)

Tackle the __inherent problems in traditional PCA__ impeding it to achieve __low latency, high throughput and fixed memory/storage__:
* Calculation of the __mean and other descriptive statistics__ as the data is available.
* __Sorting the dominant eigenvalues__ in the rank update of the QR decomposition.
* Calculating the __covariance matrix__.

## Towards incremental PCA

Incremental __calculation of the mean and other descriptive statistics__ on the datastream.
![](streaming-mean.png)


## Towards incremental PCA

Incremental __updates depending on counts (i.e. histogram)__, which contain sorted eigenvalues.
![](streaming-histogram.png)

## Towards incremental PCA

Incremental __estimation of the covariance matrix__, as __neural synaptic weights__ converge to the eigenvectors (unique set of __optimal weights__ and __uncorrelated outputs__) [Axenie et al., 2019]
![](streaming-neural-covariance.png)

## Towards incremental PCA

Converge from an initially __random set of synaptic weights__ to the __eigenvectors of the input autocorrelation__ in the eigenvalues order __minimizing the linear reconstruction (i.e. using Linear Least Squares)__. 
![](streaming-lls.png)

## Example implementation

### Multi-class classification task (fault identification in predictive maintenance [Axenie et al., 2019])

We used a __real-world stream__ with 6 types of sensory readings from a coal coke prediction production line data:

![](coke.png)


## Example implementation

### Multi-class classification task (fault identification in predictive maintenance [Axenie et al., 2019])

__Goal__: __identify faults__ in the production line by __querying the eigenvalues and eigenvectors__ to extract the normal and faulty operation configuration __prior to a multi-class classifier__. 

The datastream:
* 2M incoming events at 40 kHz. 
* eigenvalues of the input $X$ are close to the class labels (i.e. $1, 2, ..., d$)
* eigenvectors are close to the canonical basis of $R^d$, where $d$ is the number of principal components to extract
* class number for the multi-class classification task (i.e. 10 classes, 9 faults and 1 normal).

## Example implementation

### Multi-class classification task (fault identification in predictive maintenance [Axenie et al., 2019])

![](experimental-setup.png)

## PCA vs. Incremental PCA (Latency Analysis)

![](latency-analysis.png)

## PCA vs. Incremental PCA (Throughput Analysis)

![](throughput-analysis-abs.png)

## PCA vs. Incremental PCA (Throughput Analysis)

![](throughput-analysis-hist.png)

## PCA vs. Incremental PCA (Accuracy Analysis)

![](accuracy-analysis.png)

## Conclusions

* __low-latency__ (1-ms level), __high-throughput__ (Kevents/s) computation and __learning on datastreams__,

* __incremental PCA model__ and leveraging __stream dimensionality reduction__ on a distributed system,

* computation of __statistical features__ and __neural learning rules__ 

* __guaranteeing limited or programmable resource__ allocation (i.e. memory and disk),

* validated in __predictive maintenance__.

# References

[Bifet, 2010] Albert Bifet - Adaptive Stream Mining Pattern Learning and Mining from Evolving Data Streams, IOS Press, 2010.

[Sugiyama et al., 2012] Masashi Sugiyama, Motoaki Kawanabe - Machine Learning in Non-Stationary Environments Introduction to Covariate Shift Adaptation-The MIT Press, 2012.

[Sayed-Mouchaweh, 2016] Moamar Sayed-Mouchaweh - Learning from Data Streams in Dynamic Environments-Springer International Publishing, 2016.

[Axenie et al., 2019] C. Axenie, Radu Tudoran, Stefano Bortoli, Mohamad Al Hajj Hassan, Alexander Wieder, Goetz Brasche, SPICE: Streaming PCA fault Identification and Classification Engine in Predictive Maintenance, IoT Stream Workshop, European Conf. on Machine Learning and Principles and Practice of Knowledge Discovery in Databases (ECML PKDD 2019).

[Weng et al., 2003] J. Weng, Y. Zhang, and W.-S. Hwang, “Candid covariance-free incremental principal component analysis,” IEEE Transactions on Pattern Analysis and Machine Intelligence, vol. 25, no. 8, pp. 1034–1040, 2003.

[Brand, 2002] M. Brand, “Incremental singular value decomposition of uncertain data with missing values.” in ECCV (1), ser. Lecture Notes in Computer Science, A. Heyden, G. Sparr, M. Nielsen, and P. Johansen, Eds., vol. 2350. Springer, 2002, pp. 707–720.

In [None]:
# Standard PCA implementation (QR-decomposition based)
# The gain vector gamma determines the weight placed on the new data in updating each principal
# component. The first coefficient of gamma corresponds to the first principal component, etc. It can
# be specified as a single positive number (which is recycled by the function) or as a vector of length
# ncol(U). For larger values of gamma, more weight is placed on x and less on U. A common choice
# for (the components of) gamma is of the form c/n, with n the sample size and c a suitable positive constant.
StreamPCAModelsState StandardPCA (StreamPCAModelsState state, SimpleMatrix x){

        // recover state
        SimpleMatrix lambda = state.getLambda();
        SimpleMatrix Q = state.getQ();

        int ind = state.getN();
        SimpleMatrix gamma = new SimpleMatrix(Q.numCols(), 1);
        for (int id = 0; id < Q.numCols(); id++) {
            gamma.set(id, 1.0);
        }
        gamma = gamma.scale(1.0 / (ind * ind));
        SimpleMatrix xbar = state.getXbar();

        // update the average
        state.setXbar(this.updateIncrementalDataMean(xbar, x, ind));
        xbar = state.getXbar();

        // for the update remove the average
        x = x.minus(xbar).transpose();

        // update the predictor
        SimpleMatrix y = Q.mult(x);

        SimpleMatrix W;
        SimpleMatrix Qupd = Q;
        SimpleMatrix evidence = (x.mult(y.transpose()));
        SimpleMatrix gammaDiag = gamma.diag();
        SimpleMatrix increment = evidence.mult(gammaDiag);
        Qupd = Qupd.plus(increment);

        // decomposition
        QRDecomposition<DMatrixRMaj> qrDecomp = DecompositionFactory_DDRM.qr(Qupd.numRows(), Qupd.numCols());
        qrDecomp.decompose(Qupd.getMatrix());
        W = SimpleMatrix.wrap(qrDecomp.getQ(null, true));

        SimpleMatrix decay = ((gamma.minus(1.0)).scale(-1.0)).elementMult(lambda);
        SimpleMatrix incrementLambda = gamma.elementMult(y).elementMult(y);
        lambda = incrementLambda.plus(decay);

        // wrap return values
        state.setLambda(lambda);
        state.setQ(W);
        return state;
    }

In [None]:
# QR decomposition

        QRDecomposition<DMatrixRMaj> alg = createQRDecomposition();

        SimpleMatrix A = new SimpleMatrix(height,width, DMatrixRMaj.class);
        RandomMatrices_DDRM.fillUniform((DMatrixRMaj)A.getMatrix(),rand);

        assertTrue(alg.decompose((DMatrixRMaj)A.copy().getMatrix()));

        int minStride = Math.min(height,width);

        SimpleMatrix Q = new SimpleMatrix(height,compact ? minStride : height, DMatrixRMaj.class);
        alg.getQ((DMatrixRMaj)Q.getMatrix(), compact);
        SimpleMatrix R = new SimpleMatrix(compact ? minStride : height,width, DMatrixRMaj.class);
        alg.getR((DMatrixRMaj)R.getMatrix(), compact);


        // see if Q has the expected properties
        assertTrue(MatrixFeatures_DDRM.isOrthogonal((DMatrixRMaj)Q.getMatrix(), UtilEjml.TEST_F64_SQ));

        // see if it has the expected properties
        DMatrixRMaj A_found = Q.mult(R).getMatrix();

        EjmlUnitTests.assertEquals((DMatrixRMaj)A.getMatrix(),A_found,UtilEjml.TEST_F64_SQ);
        assertTrue(Q.transpose().mult(A).isIdentical(R,UtilEjml.TEST_F64_SQ));

In [1]:
# Neural PCA implementation
# The vector gamma determines the weight placed on the new data in updating each eigenvector (the
# first coefficient of gamma corresponds to the first eigenvector, etc). It can be specified as a single
# positive number or as a vector of length ncol(U). Larger values of gamma place more weight on
# x and less on U. A common choice for (the components of) gamma is of the form c/n, with n the
# sample size and c a suitable positive constant.

StreamPCAModelsState NeuralPCA(StreamPCAModelsState state, SimpleMatrix x){

        // recover state
        SimpleMatrix lambda = state.getLambda();
        SimpleMatrix Q = state.getQ();
        int ind = state.getN();
        SimpleMatrix gamma = new SimpleMatrix(Q.numCols(), 1);
        for (int id = 0; id < Q.numCols(); id++) {
            gamma.set(id, 1.0);
        }
        gamma = gamma.scale(1.0 / (ind * ind));
        SimpleMatrix xbar = state.getXbar();

        // update the average
        state.setXbar(this.updateIncrementalDataMean(xbar, x, ind));
        xbar = state.getXbar();

        // for the update remove the average
        x = x.minus(xbar).transpose();

        // update the predictor
        SimpleMatrix y = Q.mult(x);

        // prepare new state
        int m = Q.numRows(), n = Q.numCols();
        SimpleMatrix gamy = gamma.elementMult(y); // Schur product
        SimpleMatrix b = Q.extractVector(false, 0).scale(y.get(0));
        SimpleMatrix A = new SimpleMatrix(m,n);
        A.setColumn(0, 0,
                (Q.extractVector(false, 0)
                        .minus(b.scale(gamy.get(0))))
                        .getDDRM()
                        .data);
        for (int i=1; i<n; i++) {
            b = b.plus(Q.extractVector(false, i).scale(y.get(i)));
            A.setColumn(i, 0,
                    (Q.extractVector(false, i)
                            .minus(b.scale(gamy.get(i))))
                            .getDDRM()
                            .data);
        }
        A = A.plus(x.mult(gamy.transpose()));
        SimpleMatrix decay = ((gamma.minus(1.0)).scale(-1.0)).elementMult(lambda);
        SimpleMatrix increment = gamma.elementMult(y).elementMult(y);
        lambda = increment.plus(decay);

        // wrap return values
        state.setLambda(lambda);
        state.setQ(A);
        return state;
    }

SyntaxError: invalid syntax (<ipython-input-1-071c2e542ee4>, line 8)