In [1]:
# DISTRIBUTED ANOMALY DETECTION ALGORITHM BASED ON DISSIMILARITY MAPPING FILTERING
#S. Morante, J. G. Victores and C. Balaguer, "Automatic demonstration and feature selection for robot learning," Humanoid Robots (Humanoids), 2015 IEEE-RAS 15th International Conference on, Seoul, 2015, pp. 428-433.
#doi: 10.1109/HUMANOIDS.2015.7363569
#URL: http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=7363569&isnumber=7362951

import numpy as np


#Construct block functions
def dissimilarity(doublePairs): 
  return doublePairs[0][0], np.linalg.norm(doublePairs[0][1]-doublePairs[1][1])
  
def mapping(a,b):
  return a+b
  
def filtering(pairs, meanData, stdevData, theta):
  return (pairs[1] - meanData)/float(stdevData) > theta 
  
    

In [2]:
#DATA: generate sine wave with outliers
data = np.sin(range(0,10)).tolist() + \
                    [35] +  \
                    np.sin(range(11,20)).tolist() + \
                    [32]  +  \
                    np.sin(range(21,30)).tolist() + \
                    [60]  +  \
                    np.sin(range(31,50)).tolist() 

#DATA: parallelize    
dataParallel = sc.parallelize(data)    

#DATA: index the data. Put key in first place
dataIndex = dataParallel.zipWithIndex().map(lambda x: (x[1],x[0]))
dataIndex.take(20)

In [3]:
#DATA: generates all combinations of values
dataCrossJoin = dataIndex.cartesian(dataIndex)
dataCrossJoin.take(20)

In [4]:
# WARNING: not performing pruning of repeated combinations due to the risk of not summing valid combinations later

### DISSIMILARITY: calculate distance between points
dataDissimilarity=dataCrossJoin.map(dissimilarity)
dataDissimilarity.take(20)

In [5]:
### MAPPING: reduce the comparisons to single value per point
dataMapping = dataDissimilarity.reduceByKey(mapping)
dataMapping.take(20)

In [6]:
### FILTERING: filtering data by threshold (theta) using Z-score
meanData= dataMapping.values().mean()
stdevData= dataMapping.values().stdev()
theta=0
dataFiltering = dataMapping.filter(lambda pairs: filtering(pairs, meanData, stdevData, theta))
dataFiltering.collect()