In [1]:
from __future__ import print_function
%matplotlib inline
import utils
import numpy as np
from matplotlib import pyplot as plt
from sklearn.cluster import SpectralClustering
from scipy.spatial.distance import cosine, euclidean
from numpy.random import permutation as rpm
from plot import plot_images, plot_confusion_matrix
from utils import resize, sample_index, gen_solution, get_purity, get_nmi
from dtw import dtw
from numpy.linalg import norm
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import DenseMatrix, DenseVector
from pyspark.mllib.linalg.distributed import RowMatrix,IndexedRowMatrix
from pyspark.mllib.linalg import DenseMatrix, DenseVector
from fastdtw import *
from time import time
import json
import csv
import itertools
import sys



In [3]:
spark = SparkSession\
        .builder\
        .master("spark://10.140.81.48:7077")\
        .appName("DTW")\
        .config("spark.executor.memory","10g")\
        .config("spark.executor.cores","5")\
        .config("spark.driver.memory","20G")\
        .config("spark.cores.max", "80")\
        .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")
numPartitions=160

In [2]:
json1_file = open('pad_swbd_train_.json')
json1_str = json1_file.read()
train_data = json.loads(json1_str)

json1_file = open('pad_swbd_dev_.json')
json1_str = json1_file.read()
dev_data = json.loads(json1_str)

json1_file = open('pad_swbd_test_.json')
json1_str = json1_file.read()
test_data = json.loads(json1_str)

json1_data = train_data["data"] + dev_data["data"] + test_data["data"]

In [4]:
#learn pca
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
X = sc.parallelize(json1_data,numPartitions).cache()
X.count()
mat = RowMatrix(X)
pc = mat.computePrincipalComponents(10)
projected = mat.multiply(pc)

ca = projected.rows.cache()
ca.count()

#recombine
data = ca.collect()
frames = train_data["frames"]+dev_data["frames"] +test_data["frames"]
t_data = []
prev = 0
temp = []
for i in range(len(data)):
    a, b = frames[i]
    if a == prev: 
        temp.append(data[i].toArray())
    else: 
        prev = a 
        t_data.append(np.array(temp))
        temp = []
        temp.append(data[i].toArray())
t_data.append(np.array(temp))
    

2015748

In [252]:
# load padded data
data=np.load('data/swbd.npy').item()
train_x = data['train']['data']
train_y_raw = train_data["labels"]
train_y = [label.split('_')[0] for label in train_y_raw]
dev_x = data['dev']['data']
dev_y_raw = dev_data["labels"]
dev_y = [label.split('_')[0] for label in dev_y_raw]
test_x = data['test']['data']
test_y_raw = test_data["labels"]  
test_y = [label.split('_')[0] for label in test_y_raw]

# used in clustering
signals = t_data # tuple of numpy.ndarray  31961 * [time*39]
labels = train_y+dev_y+test_y  # labels. tuple of strings 31961. e.g. 'abandoned'

print('number of signals: {}'.format(len(signals)))

number of signals: 31961


In [253]:
# count number of words. Change strings to numeric labels
wordSet = set()
for word in labels:
    wordSet.add(word)
print('total number of words is {}'.format(len(wordSet)))

# make a list. let list index be the numeric label of word
wordList = list(wordSet)

# numeric label of signals
numLabels = np.array(list(map(lambda x: wordList.index(x), labels)))

total number of words is 6204


# Loading Embedding Data

In [254]:
# load embedding data
emb_data = np.load('data/swbd_embeddings.npy').item()
emb_train_x = emb_data['train']['embs']
emb_train_y_raw = emb_data['train']['labels']
emb_train_y = [label.split('_')[0] for label in train_y_raw]
emb_dev_x = emb_data['dev']['embs']
emb_dev_y_raw = emb_data['dev']['labels']
emb_dev_y = [label.split('_')[0] for label in dev_y_raw]
emb_test_x = emb_data['test']['embs']
emb_test_y_raw = emb_data['test']['labels']
emb_test_y = [label.split('_')[0] for label in test_y_raw]

# labels are the same. Use the embedding matrix in clustering
emb_signals = np.concatenate((emb_train_x,emb_dev_x,emb_test_x), axis = 0) # np.ndarray 31961*512
print('shape of embeddings: {}'.format(emb_signals.shape))

shape of embeddings: (31961, 512)


# Small raw data and small labels

In [255]:
class_num = 20
wordBags = dict()
for word in labels:
    if word in wordBags:
        wordBags[word] += 1
    else:
        wordBags[word] = 1
sorted_by_value = sorted(wordBags.items(), key=lambda kv: -kv[1])
print(sorted_by_value[:class_num])
print('total number of series: {}'.format(sum([x[1] for x in sorted_by_value[:class_num]])))

[('because', 340), ('recycling', 315), ('benefits', 259), ('something', 230), ('exactly', 228), ('probably', 225), ('insurance', 196), ('punishment', 190), ('everything', 174), ('company', 149), ('sometimes', 144), ('interesting', 143), ('recycle', 143), ('situation', 141), ('problem', 139), ('anything', 131), ('plastic', 127), ('actually', 125), ('understand', 123), ('vacation', 123)]
total number of series: 3645


In [256]:
# get the small data set
small_index_list = [x[0] for x in sorted_by_value[:class_num]]
small_raw_signal = []
small_emb_data = []
small_num_labels = []
for i in range(len(labels)):
    if labels[i] in small_index_list:
        small_raw_signal.append(signals[i])
        small_emb_data.append(emb_signals[i,:])
        small_num_labels.append(small_index_list.index(labels[i]))

In [257]:
# concatinate the embeded signal to make a data matrix
small_emb_signal = np.zeros((len(small_emb_data), 512))
for i in range(len(small_emb_data)):
    small_emb_signal[i] = small_emb_data[i]

Small data summary: 
1. small_index_list: array with length 20. Used as a reference for numeric label

    ['because', 'recycling', 'benefits', 'something', 'exactly', 'probably', 
    'insurance', 'punishment', 'everything', 'company', 'sometimes',
    'interesting', 'recycle', 'situation', 'problem', 'anything', 
    'plastic', 'actually', 'understand', 'vacation']
                 
                 
2. small_raw_signal: # list of numpy.ndarray  3645 * [time*39]

3. small_emb_signal: # numpy.ndarray 3645 * 512

4. small_num_labels: # array with length 3645. Numeric Label for each signal.

The signals of embedding coordinate with signals of raw data

In [237]:
small_raw_signal[0].shape

(53, 60)

# Play with DTW distance

In [None]:
#original

start = time()
def wrapper_dtw(data):
    def _dtw(x_tup):
        x = x_tup[1]
        idx = x_tup[0]
        res = np.zeros(len(data))
        start = time()
        for i in range(idx,len(data)):    
            res[i] = fastdtw(x,data[i],dist=lambda x,y: norm(x-y))[0]
        print(time()-start)
        return (idx,res)
    return _dtw

n_data = len(small_num_labels)
Euc_Lap = np.zeros((n_data,n_data))
zip_small_raw_signal = list(zip(range(len(small_raw_signal)),small_raw_signal))
distributed_raw = sc.parallelize(zip_small_raw_signal,numPartitions).coalesce(numPartitions,shuffle=True).cache()
start = time()

sc_data = sc.broadcast(small_raw_signal)
tempo = distributed_raw.map(wrapper_dtw(sc_data.value)).cache() 
print(tempo.count())
Euc_Lap = tempo.sortByKey(lambda x: x[0]).map(lambda x: x[1]).collect()



Euc_Lap2 = Euc_Lap
for i in range(len(test_labels)):
    for j in range(i):
        Euc_Lap2[i][j] = Euc_Lap[j][i]
"""
Hierarchical clustering of embedding data using distance matrix
"""
from clustering import kmeans_cluster, hierarchical_cluster, hierarchical_cluster_with_distMat
linkage_matrix, pred = hierarchical_cluster_with_distMat(np.array(Euc_Lap2), n_clusters=len(small_index_list),
                                                         method='ward', metric='cosine')
_, cm, purity = get_purity(pred, small_num_labels)
nmi = get_nmi(pred, small_num_labels)
plot_confusion_matrix(cm, small_index_list, normalize=False, rotation=90, figsize=(9, 9))
print("Ward, NMI: %.3f, Purity: %.3f" % (nmi, purity))

''' 
pred = clustering.labels_
_, cm, purity = get_purity(pred, small_num_labels)
nmi = get_nmi(pred, small_num_labels)
plot_confusion_matrix(cm, small_index_list, normalize=False, rotation=90, figsize=(9, 9))
print("rbf, NMI: %.3f, Purity: %.3f" % (nmi, purity))
end = time.time()
print('time used: {}'.format(end-start))
'''
print(time()-start)
from sklearn.metrics import adjusted_rand_score

#rand = np.random.randint(0,20,61)
#print(adjusted_rand_score(rand,small_num_labels))
print(adjusted_rand_score(pred,small_num_labels))
