In [7]:
import numpy as np
import pandas as pd

from dask.distributed import Client, progress
from sklearn.metrics import roc_auc_score
import project_path
from src.util.generate_connected_graph import generate_connected_graph
from src.util.generate_lr_data import generate_low_rank_data
from src.util.generate_anomaly import generate_spatio_temporal_anomaly
from src.algos.lr_stss import lr_stss

In [2]:
client = Client(threads_per_worker=1, n_workers=6)

In [3]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 6,Total memory: 15.73 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:63184,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 6
Started: Just now,Total memory: 15.73 GiB

0,1
Comm: tcp://127.0.0.1:63213,Total threads: 1
Dashboard: http://127.0.0.1:63215/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:63187,
Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-numbvww8,Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-numbvww8

0,1
Comm: tcp://127.0.0.1:63225,Total threads: 1
Dashboard: http://127.0.0.1:63226/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:63188,
Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-_4kekoyq,Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-_4kekoyq

0,1
Comm: tcp://127.0.0.1:63214,Total threads: 1
Dashboard: http://127.0.0.1:63217/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:63189,
Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-vv1suqao,Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-vv1suqao

0,1
Comm: tcp://127.0.0.1:63220,Total threads: 1
Dashboard: http://127.0.0.1:63223/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:63190,
Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-iws4jnsl,Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-iws4jnsl

0,1
Comm: tcp://127.0.0.1:63219,Total threads: 1
Dashboard: http://127.0.0.1:63221/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:63191,
Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-mx94r2d4,Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-mx94r2d4

0,1
Comm: tcp://127.0.0.1:63228,Total threads: 1
Dashboard: http://127.0.0.1:63229/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:63192,
Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-dcznzt6e,Local directory: C:\Users\merti\AppData\Local\Temp\dask-scratch-space\worker-dcznzt6e


In [8]:
## Control Variables
gt = 'grid'
graph_param = (8,5)
dtn = 4
rad =1
amp =1
window = 'boxcar'
distribution = 'bernoulli'
local_distribution = 'constant'
dims = (40,24,7,4)
ranks = (8,8,5,4)
NoA = 50
NoG = 1
NoT = 1
time_m = 2
local_m = 1
lda_2 = 10
psi = 20
maxit = 300
maxit2 = 40
## Independent variables
### Hyperparameters
lda_1 = np.logspace(0,-4,4)
lda_l = np.logspace(0,-4,4)
lda_t = np.logspace(0,-4,4)

metadata = {'graph_type': gt,
            'graph_param': graph_param,
            'number_of_anomalies': NoA,
            'anomaly_duration': dtn,
            'anomaly_radius': rad,
            'anomaly_amplitude': amp,
            'window': window,
            'distribution': distribution,
            'local_distribution': local_distribution,
            'number_of_graphs': NoG,
            'number_of_trials': NoT,
            'ranks':ranks,
            'dims':dims,
            'time_mode': time_m,
            'local_mode': local_m,
            'lda_2':lda_2,
            'lda_1':lda_1,
            'lda_l':lda_l,
            'lda_t':lda_t,
            'psi': psi,
            'maxit':maxit,
            'maxit2':maxit2,
            }

# Define experiment function
def run_exp(X, Y, an_m, inputs):
    G = inputs['G']
    A = nx.adjacency_matrix(G)
    Deg = np.diag(np.asarray(np.sum(A,axis=1)).ravel())
    Dsq = np.linalg.inv(np.sqrt(Deg))
    An = Dsq@A@Dsq
    Y = inputs['Y']
    Y = np.ma.masked_array(Y, mask=np.zeros(dims,dtype=bool))
    X = inputs['X']
    an_m = inputs['an_m']
    psi = inputs['psi']
    res = lr_stss(Y, An, time_m,local_m, verbose=0, max_it2=40, max_it=300,
        lda_2=inputs['lda_2'], lda_1=inputs['lda_1'], lda_t=inputs['lda_t'],
        lda_l=inputs['lda_l'], psis=[psi,psi,psi,psi])
    
    result = {'graph_type': gt,
              'auc': roc_auc_score(an_m.ravel(),np.abs(res['S']).ravel()),
              'rec_err': np.linalg.norm(res['X']-X)/np.linalg.norm(X),
              'anomaly_cardinality': an_m.sum(),
              'number_of_anomalies': NoA,
              'anomaly_duration': dtn,
              'anomaly_radius': rad,
              'graph_seed': inputs['graph_seed'],
              'anomaly_seed': inputs['anomaly_seed'],
              'anomaly_amplitude': amp,
              'lda_1': inputs['lda_1'],
              'lda_2': inputs['lda_2'],
              'lda_l': inputs['lda_l'],
              'lda_t': inputs['lda_t'],
              'psi_1': inputs['psi'], 'psi_2': inputs['psi'],
              'psi_3': inputs['psi'], 'psi_4': inputs['psi'],
              'maxit': maxit, 'maxit2': maxit2,
              'it': res['it']}
    return result

In [14]:
futures = []
seed = 21321
input = {}
for i in range(NoG):
    G,sd = generate_connected_graph(graph_param, gt, seed=seed)
    input['G'] = G
    input['graph_type'] = gt
    input['graph_seed'] = sd
    seed = sd
    for j in range(NoT):
        # Generate low rank normal data
        X = generate_low_rank_data(dims, ranks, seed)
        X = 1*X/np.std(X)
        input['X'] = X
        # Generate anomaly
        an, an_m = generate_spatio_temporal_anomaly(dims, G, NoA, duration=dtn,
                                                    radius=rad, seed=seed,
            time_m=time_m, local_m=local_m, window_type='boxcar', amplitude=amp,
            distribution=distribution, local_dist='constant')
        input['anomaly_seed'] = seed
        input['an_m'] = an_m
        Y = X+an
        # Y = np.ma.masked_array(Y, mask=np.zeros(dims,dtype=bool))
        input['Y'] = Y
        
        for k1 in range(len(lda_1)):
            for k2 in range(len(lda_l)):
                for k3 in range(len(lda_t)):
                    input['lda_1'] = lda_1[k1]
                    input['lda_l'] = lda_l[k2]
                    input['lda_t'] = lda_t[k3]
                    input['lda_2'] = lda_2
                    input['psi'] = psi

                    futures.append(client.submit(run_exp, X, Y, an_m, input))

Graph is connected.


  A = nx.adjacency_matrix(G)


ValueError: cannot reshape array of size 26880 into shape (215040,)

In [8]:
client.shutdown()

In [12]:
an, an_m = generate_spatio_temporal_anomaly(dims, G, NoA, duration=dtn,
                                                    radius=rad, seed=seed,
            time_m=time_m, local_m=local_m, window_type='boxcar', amplitude=amp,
            distribution=distribution, local_dist='constant')

  A = nx.adjacency_matrix(G)


In [15]:
help(client.submit)

Help on method submit in module distributed.client:

submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=None, **kwargs) method of distributed.client.Client instance
    Submit a function application to the scheduler
    
    Parameters
    ----------
    func : callable
        Callable to be scheduled as ``func(*args **kwargs)``. If ``func`` returns a
        coroutine, it will be run on the main event loop of a worker. Otherwise
        ``func`` will be run in a worker's task executor pool (see
        ``Worker.executors`` for more information.)
    *args : tuple
        Optional positional arguments
    key : str
        Unique identifier for the task.  Defaults to function-name and hash
    workers : string or iterable of strings
        A set of worker addresses or hostnames on which computations may be
        performed. Leave empty to default to all workers (commo