# Generate multi-channel Features

In [None]:
import pandas as pd
import numpy as np
import itertools
import matplotlib.pyplot as plt

# Load dataset
data = 'D' # ****** specify dataset (A / B / C / D) ******
dataset = pd.read_csv(f'./data/data_{data}.csv')
dataset_name='dataSet'+data

## Generate sliding window

In [None]:
# Generate features
all_windows=[]
min_window = 2 # minimum window size
max_window = 5 # maximum window size
for window_size in range(min_window,max_window+1):
    step=1
    ranges = [range(2)]*window_size
    windows=[]
    for xs in itertools.product(*ranges):
        windows.append([*xs])
    all_windows.append(windows)
    print(f'generating features with window size {window_size}...')
    for window in windows:
        print(f'window:{window}')
        sliding_activation=[]
        for line_n in range(len(dataset)):
            line = list(dataset.iloc[line_n,:])
            region_activation = []
            for start in range(32-window_size):
                region = list(dataset.iloc[:,1:-1].iloc[line_n,start:start+window_size])
                if region == window:
                    region_activation.append(1)
                else:
                    region_activation.append(0)
                start+=step
            sliding_activation.append(region_activation)
        pd.DataFrame(sliding_activation).to_csv(f'./features/window_size_{window_size}_window_{window}.csv',index=False)

In [None]:
# Combine features
print(f'combining features ...')

for i, window_size in enumerate(range(min_window,max_window+1)):
    windows = all_windows[i]
    df_all = []
    for col in range(32-window_size):
        all_column=[]
        for window in windows:
            column = list(pd.read_csv(f'./features/window_size_{window_size}_window_{window}.csv').iloc[:,col])
            all_column.append(column)
        df_all.append(list(zip(*all_column)))
    
    #save data
    transpose_m = np.transpose(np.array(df_all),(1,0,2))
    df_save = np.reshape(transpose_m,(len(transpose_m), -1))
    np.savetxt(f"./features/sliding_window_{window_size}.txt",df_save)
print('done')

# Multi-channel 1D poly-CNN

In [None]:
from keras.models import Sequential
from keras import Model
from tensorflow.keras.optimizers import Adam
from keras.layers import Dense, Conv1D, Flatten, Concatenate, Input

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

In [None]:
#load kernels
kernel_matrix=[]
kernels = open(f'./features/{dataset_name}.kern').readlines()
for i in range(len(kernels)):
    kernel = [float(i) for i in kernels[i].strip().split()]
    kernel_matrix.append(kernel)

#load data
window_size = 5 # ****** specify window size ******

n_channel = 2**window_size

X = np.loadtxt(f"./features/sliding_window_{window_size}.txt")
X = X.reshape(X.shape[0], X.shape[1] // n_channel , n_channel)
interaction_param, y = dataset['interaction_parameter'], dataset['lamellar_period']

In [None]:
# train/test split
train_idx = np.random.choice(range(len(X)), size=int(0.8*(len(X))), replace=False)
X_train = X[train_idx]
y_train = np.array([y[i] for i in train_idx])

test_idx = list(set(range(len(X)))-set(train_idx))
X_test = X[test_idx]
y_test = np.array([y[i] for i in test_idx])

# auxInputDim
interaction_kernel_train = np.array([[interaction_param[i]]+kernel_matrix[i] for i in train_idx])
interaction_kernel_test = np.array([[interaction_param[i]]+kernel_matrix[i] for i in test_idx])

In [None]:
input_b = Input(shape=(auxInputDim))

input = Input(shape =(32-window_size,n_channel)) 

x = Conv1D(29, 2, activation="relu", input_shape=(29,n_channel))(input)
x = Flatten()(x)

x = Concatenate()([x,input_b])
x = Flatten()(x)

x = Dense(units = 300, activation ='relu')(x)
x = Dense(units = 110, activation ='relu')(x)

regression = Dense(units = 1, activation='relu', name='regression')(x)

model = Model(inputs=[input,input_b], outputs=regression)

model.compile(loss='mse', optimizer=Adam(), metrics=['mse'])
model.summary()

In [None]:
# model training
epochs = 500
model.fit([X_train, interaction_kernel_train], y_train, batch_size=12,epochs=epochs, verbose=1)

In [None]:
# making prediction
ypred_test = model.predict([X_test, interaction_kernel_test])
model_mse = mean_squared_error(y_test, ypred_test)
print("MSE: %.4f" % model_mse)

In [None]:
# calc R2
correlation_matrix = np.corrcoef(ypred_test.flatten(), y_test)
correlation_xy = correlation_matrix[0,1]
r_squared = correlation_xy**2

In [None]:
# plot result
fig = plt.figure()
fig.patch.set_facecolor('white')
fig.patch.set_alpha(1)
ax = fig.add_subplot(111)

plt.title(f'MSE = {model_mse:.4f}.  Epochs = {epochs}.  R^2 = {r_squared:.4f}') # MSE
plt.scatter(ypred_test, y_test, s=5, color="blue") # x_axis, y_axis, ...
plt.xlabel("Predicted")
plt.ylabel("Actual")

lims = [
    np.min([ax.get_xlim(), ax.get_ylim()]),  # min of both axes
    np.max([ax.get_xlim(), ax.get_ylim()]),  # max of both axes
]
plt.plot(lims, lims, 'k-', alpha=0.75, zorder=0)
plt.xlim(lims)
plt.ylim(lims)