[View in Colaboratory](https://colab.research.google.com/github/DongminWu/MLpractice/blob/master/deep_siamese_one_shot_learning.ipynb)

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline


from scipy.stats import norm

from scipy.signal import triang

from sklearn.utils import shuffle

# Data Generating

In [0]:
num_of_each_type = 40
num_of_weekend = 13
day_length = 1440

In [0]:
def gaussian_pdf(mu,  sigma, length):
  '''
  mu: mean of the gaussian distribution
  sigma: variance of the gaussian distribution
  
  length: the length of the sequence
  '''
  
  raw_ = np.linspace(-5, 5, length)
  ret = 1/(sigma * np.sqrt(2 * np.pi)) * \
      np.exp( - (raw_ - mu)**2 / (2 * sigma**2))
  return ret

                
  

# Triangle wave

def triangle_pdf(mid, width, length):

  raw_ = np.linspace(-5, 5, length)

  l_mask = mid  - float(width)/2
  l_empty = np.where(raw_ < l_mask)[0]

  ret_l = [0]*len(l_empty)

  r_mask = mid + float(width)/2
  r_empty = np.where(raw_ > r_mask)[0]
  ret_r = [0]*len(r_empty)

  number_mid = length - len(l_empty) - len(r_empty)
  ret_m = triang(number_mid)

  ret = np.concatenate([ret_l, ret_m, ret_r])
  
  return ret


def random_pdf(prob, max, length):
  raw = [0] * length
  
  for i,each in enumerate(raw):
    r = np.random.random()
    if r < prob:
      raw[i] = np.random.uniform(-max, max)

  return np.array(raw)


def generate_binary(distribution):
  return np.array([np.random.choice( np.array([0,1]),p=[1-p,p]) for p in distribution])

## Train set

In [0]:
# 4 sensors

g1_set = [generate_binary(gaussian_pdf(0,1,day_length)) for i in range(num_of_each_type)]
g2_set = [generate_binary(gaussian_pdf(1,1.3,day_length)) for i in range(num_of_each_type)]
t1_set = [generate_binary(triangle_pdf(0,5,day_length)) for i in range(num_of_each_type)]
t2_set = [generate_binary(triangle_pdf(-1,4,day_length)) for i in range(num_of_each_type)]
empty1_set = [generate_binary(np.clip(random_pdf(0.1, 0.2, day_length),0,1)) for i in range(num_of_weekend)]
empty2_set = [generate_binary(np.clip(random_pdf(0.1, 0.2, day_length),0,1)) for i in range(num_of_weekend)]


In [23]:
g_set = np.concatenate([g1_set, g2_set])
np.random.shuffle(g_set)
g_pair_set = g_set.reshape([-1,2,1440])

t_set = np.concatenate([t1_set, t2_set])
np.random.shuffle(t_set)
t_pair_set = t_set.reshape([-1,2,1440])

empty_set = np.concatenate([empty1_set, empty2_set])
np.random.shuffle(empty_set)
empty_pair_set = empty_set.reshape([-1,2,1440])

trainX_neighbors = np.concatenate([g_pair_set, t_pair_set, empty_pair_set])
trainY_neighbors = np.ones([trainX_neighbors.shape[0],1])

print("trainX_neighbors", trainX_neighbors.shape)
print("trainY_neighbors", trainY_neighbors.shape)



('trainX_neighbors', (93, 2, 1440))
('trainY_neighbors', (93, 1))


In [24]:

trainX_no_neighbors = []
for i in range(trainX_neighbors.shape[0]):
  first_set = np.random.choice([t_set, g_set, empty_set])
  second_set = np.random.choice([t_set, g_set, empty_set])
  first_idx = np.random.choice(first_set.shape[0]-1)
  second_idx = np.random.choice(second_set.shape[0]-1)
  sample = np.array([first_set[first_idx], second_set[second_idx]] )
  trainX_no_neighbors.append(sample)

trainX_no_neighbors = np.stack(trainX_no_neighbors)
trainY_no_neighbors = np.ones([trainX_no_neighbors.shape[0],1])
print("trainX_no_neighbors", trainX_no_neighbors.shape)
print("trainY_no_neighbors", trainY_no_neighbors.shape)

('trainX_no_neighbors', (93, 2, 1440))
('trainY_no_neighbors', (93, 1))


In [25]:
trainX = np.concatenate([trainX_neighbors, trainX_no_neighbors])
trainY = np.concatenate([trainY_neighbors, trainY_no_neighbors])
trainX, trainY = shuffle(trainX, trainY)
print("trainX", trainX.shape)
print("trainY", trainY.shape)

('trainX', (186, 2, 1440))
('trainY', (186, 1))


## Test Set

The difference is the mean and variance of each waveform


In [26]:
# 4 sensors

g1_set = [generate_binary(gaussian_pdf(0.5,1.2,day_length)) for i in range(num_of_each_type)]
g2_set = [generate_binary(gaussian_pdf(-0.3,1.9,day_length)) for i in range(num_of_each_type)]
t1_set = [generate_binary(triangle_pdf(0,3,day_length)) for i in range(num_of_each_type)]
t2_set = [generate_binary(triangle_pdf(-2,3,day_length)) for i in range(num_of_each_type)]
empty1_set = [generate_binary(np.clip(random_pdf(0.1, 0.2, day_length),0,1)) for i in range(num_of_weekend)]
empty2_set = [generate_binary(np.clip(random_pdf(0.1, 0.2, day_length),0,1)) for i in range(num_of_weekend)]

g_set = np.concatenate([g1_set, g2_set])
np.random.shuffle(g_set)
g_pair_set = g_set.reshape([-1,2,1440])

t_set = np.concatenate([t1_set, t2_set])
np.random.shuffle(t_set)
t_pair_set = t_set.reshape([-1,2,1440])

empty_set = np.concatenate([empty1_set, empty2_set])
np.random.shuffle(empty_set)
empty_pair_set = empty_set.reshape([-1,2,1440])

testX_neighbors = np.concatenate([g_pair_set, t_pair_set, empty_pair_set])
testY_neighbors = np.ones([testX_neighbors.shape[0],1])

print("testX_neighbors", testX_neighbors.shape)
print("testY_neighbors", testY_neighbors.shape)

testX_no_neighbors = []
for i in range(testX_neighbors.shape[0]):
  first_set = np.random.choice([t_set, g_set, empty_set])
  second_set = np.random.choice([t_set, g_set, empty_set])
  first_idx = np.random.choice(first_set.shape[0]-1)
  second_idx = np.random.choice(second_set.shape[0]-1)
  sample = np.array([first_set[first_idx], second_set[second_idx]] )
  testX_no_neighbors.append(sample)

testX_no_neighbors = np.stack(testX_no_neighbors)
testY_no_neighbors = np.ones([testX_no_neighbors.shape[0],1])
print("testX_no_neighbors", testX_no_neighbors.shape)
print("testY_no_neighbors", testY_no_neighbors.shape)


testX = np.concatenate([testX_neighbors, testX_no_neighbors])
testY = np.concatenate([testY_neighbors, testY_no_neighbors])
testX, testY = shuffle(testX, testY)
print("testX", testX.shape)
print("testY", testY.shape)

('testX_neighbors', (93, 2, 1440))
('testY_neighbors', (93, 1))
('testX_no_neighbors', (93, 2, 1440))
('testY_no_neighbors', (93, 1))
('testX', (186, 2, 1440))
('testY', (186, 1))


# model building

![替代文字](https://cloud.githubusercontent.com/assets/9861437/20479454/405a1aea-b004-11e6-8a27-7bb05cf0a002.png)

In [0]:
from keras import Sequential
from keras import Model
from keras.layers import LSTM, Bidirectional, Input

In [0]:
dim_lstm_layer = 64

In [0]:
input1 = Input(shape=(None, 1), name = 'input_layer1')
input2 = Input(shape=(None, 1), name = 'input_layer2')

bi_lstm1_1 = Bidirectional(LSTM(dim_lstm_layer, return_sequences=True, name="LSTM_layer1_1"))(input1)
bi_lstm1_2 = Bidirectional(LSTM(dim_lstm_layer, return_sequences=True, name="LSTM_layer1_2"))(input2)

bi_lstm2_1 = Bidirectional(LSTM(dim_lstm_layer, return_sequences=True, name="LSTM_layer2_1"))(bi_lstm1_1)
bi_lstm2_2 = Bidirectional(LSTM(dim_lstm_layer, return_sequences=True, name="LSTM_layer2_2"))(bi_lstm1_2)


