In [None]:
import glob
import os

import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

## Import Image Data
> Save numpy format image to pickle file(./tmp/image_data.pkl).

In [None]:
pic_path = "./data"
label_list = glob.glob(os.path.join(pic_path, '*'))

label_list

In [None]:
# image_data = pd.DataFrame(columns=["image", "label"])
#
# for path, label in zip(label_list, label_list):
#     pic_arr = [np.load(os.path.join(path, path_itm)) for path_itm in os.listdir(path) if path_itm[-3:]=="npy"]
#     tmp_data = pd.DataFrame({
#         "image": pic_arr
#     })
#     tmp_data["label"] = label.split("/")[-1]
#
#     image_data = image_data.append(tmp_data)
#
# image_data

In [None]:
# image_data.to_pickle('tmp/image_data.pkl')

In [None]:
image_data = pd.read_pickle("tmp/image_data.pkl")
image_data

## Show Sample Image.

In [None]:
not_cataloged_sample = image_data[
    image_data["label"]=="No_Catalogue"].sample(n=9)["image"]

for index, pic in enumerate(not_cataloged_sample):
    plt.subplot(3, 3, index+1)
    plt.imshow(pic)
    plt.title("Not_Cataloged")
    plt.colorbar()
    plt.grid(False)

plt.subplots_adjust(wspace=0, hspace=0.8)
# plt.savefig(f"./pic/{"Not_Cataloged"}.png")

plt.tight_layout()
plt.show()

In [None]:
def fft_shift(img):
    fft_pic = np.fft.fft2(img)
    # fft_pic_log = np.log(np.abs(fft_pic))
    fft_pic_log =  np.fft.fftshift(np.abs(fft_pic))

    return fft_pic_log


ffted_data = image_data.copy()
ffted_data["image"] = ffted_data["image"].apply(fft_shift)

ffted_data

In [None]:
min_number = ffted_data.label.value_counts().min()

ffted_data = pd.concat([ffted_data[ffted_data["label"] == "No_Catalogue"].sample(min_number),
                        ffted_data[ffted_data["label"] == "Catalogued"].sample(min_number)])

In [None]:
ffted_data.loc[:, "number_l"] = ffted_data.loc[:, "label"].apply(lambda x: x=="Catalogued")
ffted_data = ffted_data.astype({"number_l": int})
ffted_data.sample(frac=1)

In [None]:
ffted = ffted_data[
    ffted_data["label"]=="Catalogued"].sample(n=9)["image"]

for index, pic in enumerate(ffted):
    plt.subplot(3,3,index+1)
    plt.imshow(pic, cmap = 'gray')
    plt.title("Catalogued_FFT")
    plt.colorbar()
    plt.grid(False)

plt.subplots_adjust(wspace=0, hspace=0.8)
# plt.savefig(f"./pic/{"Catalogued"}.png")

plt.tight_layout()
plt.show()

## Adapt Number of Image.

In [None]:
min_number = image_data.label.value_counts().min()

fixed_data = pd.concat([image_data[image_data["label"] == "No_Catalogue"].sample(min_number),
                       image_data[image_data["label"] == "Catalogued"].sample(min_number)])

In [None]:
fixed_data.loc[:, "number_l"] = fixed_data.loc[:, "label"].apply(lambda x: x=="Catalogued")
fixed_data = fixed_data.astype({"number_l": int})
fixed_data.sample(frac=1)

In [None]:
fixed_data.dtypes

# CNN Model

In [None]:
import tensorflow.keras.layers as layer

In [None]:
BATCH_SIZE = 256
SHUFFLE_BUFFER_SIZE = 100
EPOCHS = 30

In [None]:
total_train_set = fixed_data.copy()
del total_train_set["label"]

In [None]:
total_train_set = total_train_set.sample(frac=1)

tmp_array = []
total_train_set["image"].apply(tmp_array.append)
tmp_array = np.array(tmp_array, dtype=int)
tmp_array = np.stack((tmp_array,), axis=-1)

In [None]:
total_dataset = tf.data.Dataset.from_tensor_slices((tmp_array, total_train_set["number_l"]))

### LeNet Model(No FFT)

In [None]:
ada_layer = tf.keras.layers.experimental.preprocessing.Normalization()
ada_layer.adapt(tmp_array)

le_net_model = tf.keras.Sequential([
    layer.Input((32, 32, 1)),
    ada_layer,
    layer.ZeroPadding2D((1, 1)),
    layer.Conv2D(6, (5, 5), strides=(1, 1), padding='valid', name='conv1'),
    layer.Activation('relu'),
    layer.MaxPooling2D((2, 2), strides=(2, 2)),
    layer.Conv2D(6, (5, 5), strides=(1, 1), padding='valid', name='conv2'),
    layer.Activation('relu'),
    layer.MaxPooling2D((2, 2), strides=(2, 2)),
    layer.Flatten(),
    layer.Dense(24, activation='relu', name='fc1'),
    layer.Dense(13, activation='relu', name='fc2'),
    layer.Dropout(0.2),
    layer.Dense(2, activation='softmax')
])

In [None]:
le_net_model.compile(optimizer=tf.keras.optimizers.RMSprop(),
                     loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                     metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

In [None]:
le_net_model.summary()

### Simple NN(No FFT)

In [None]:
ada_layer = tf.keras.layers.experimental.preprocessing.Normalization()
ada_layer.adapt(tmp_array)

simple_nn_model = tf.keras.Sequential([
    layer.Input((32, 32, 1)),
    ada_layer,
    layer.Flatten(),
    layer.Dense(24, activation='relu', name='fc1'),
    layer.Dense(13, activation='relu', name='fc2'),
    layer.Dropout(0.2),
    layer.Dense(2, activation='softmax')
])

In [None]:
simple_nn_model.compile(optimizer=tf.keras.optimizers.RMSprop(),
                     loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                     metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

In [None]:
simple_nn_model.summary()

### LeNet (FFT)

In [None]:
fft_total_set = ffted_data.copy()
fft_total_set["number_l"] = fft_total_set["label"].apply(lambda x: x=="Catalogued")
del fft_total_set["label"]

fft_total_set = fft_total_set.astype({"number_l": int}).sample(frac=1)

In [None]:
tmp_array = []
fft_total_set["image"].apply(tmp_array.append)
tmp_array = np.array(tmp_array, dtype=float)
tmp_array = np.stack((tmp_array,), axis=-1)

In [None]:
fft_total_dataset = tf.data.Dataset.from_tensor_slices((tmp_array, fft_total_set["number_l"]))

In [None]:
fft_ada_layer = tf.keras.layers.experimental.preprocessing.Normalization()
fft_ada_layer.adapt(tmp_array)

fft_le_net_model = tf.keras.Sequential([
    layer.Input((32, 32, 1)),
    fft_ada_layer,
    layer.ZeroPadding2D((1, 1)),
    layer.Conv2D(6, (5, 5), strides=(1, 1), padding='valid', name='conv1'),
    layer.Activation('relu'),
    layer.MaxPooling2D((2, 2), strides=(2, 2)),
    layer.Conv2D(6, (5, 5), strides=(1, 1), padding='valid', name='conv2'),
    layer.Activation('relu'),
    layer.MaxPooling2D((2, 2), strides=(2, 2)),
    layer.Flatten(),
    layer.Dense(24, activation='relu', name='fc1'),
    layer.Dense(13, activation='relu', name='fc2'),
    layer.Dropout(0.2),
    layer.Dense(2, activation='softmax')
])

In [None]:
fft_le_net_model.compile(optimizer=tf.keras.optimizers.RMSprop(),
                         loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                         metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
                         run_eagerly=True)

In [None]:
fft_le_net_model.summary()

### Simple NN (with FFT) model

In [None]:
fft_ada_layer = tf.keras.layers.experimental.preprocessing.Normalization()
fft_ada_layer.adapt(tmp_array)

fft_simple_nn_model = tf.keras.Sequential([
    layer.Input((32, 32, 1)),
    fft_ada_layer,
    layer.Flatten(),
    layer.Dense(24, activation='relu', name='fc1'),
    layer.Dense(13, activation='relu', name='fc2'),
    layer.Dropout(0.2),
    layer.Dense(2, activation='softmax')
])

In [None]:
fft_simple_nn_model.compile(optimizer=tf.keras.optimizers.RMSprop(),
                         loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                         metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

In [None]:
fft_simple_nn_model.summary()

### ResNet50 Model

In [None]:
resnet50_model = tf.keras.applications.ResNet50(weights=None,
                                                include_top=False,
                                                input_shape=(32,32,3),
                                                pooling="avg")

In [None]:
resnet50_model.compile(optimizer=tf.keras.optimizers.RMSprop(),
                        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

## Test

## Pre-process

In [None]:
ffted_data_random = ffted_data.sample(frac=1)
fixed_data_random = fixed_data.sample(frac=1)
fixed_data_random

### Data without FFT

In [None]:
def get_sample(df: pd.DataFrame, number: int = 1, rgb: str = "1d"):
    n1_df = df[df["number_l"]==1].sample(frac=1/number)
    n0_df = df[df["number_l"]==0].sample(frac=1/number)
    print(str(min(len(n1_df), len(n0_df))))
    new_df = pd.concat((n1_df, n0_df))
    new_df = new_df.sample(frac=1)


    values = []
    new_df["image"].apply(values.append)
    values = np.array(values, dtype=float)
    if rgb == "1d":
        values = np.stack((values,), axis=-1)
    elif rgb=="3d":
        values = np.stack((values,)*3, axis=-1)

    labels= np.array(new_df["number_l"].copy())

    train_set_val, test_set_val, *_ = np.array_split(values, 2)
    train_set_labels, test_set_labels, *_ = np.array_split(labels, 2)

    print(" train_dataset shape = " + str(train_set_val.shape[0]) +
          "\n train_set_labels shape = " + str(train_set_labels.shape[0]) +
          "\n test_set_val shape = " + str(test_set_val.shape[0]) +
          "\n test_set_labels shape = " + str(test_set_labels.shape[0]))

    print("count[0]: " + str(np.count_nonzero(train_set_labels == 0)) +
          "\n count[1]: " + str(np.count_nonzero(train_set_labels == 1)))

    train_dataset = tf.data.Dataset.from_tensor_slices((train_set_val, train_set_labels))
    test_dataset = tf.data.Dataset.from_tensor_slices((test_set_val, test_set_labels))

    train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
    test_dataset = test_dataset.batch(BATCH_SIZE)

    return train_dataset, test_dataset


In [None]:
train_dataset_max_1d, test_dataset_max_1d = get_sample(fixed_data, 2)
len(train_dataset_max_1d)

### Data with FFT

In [None]:
fft_random_values = []
ffted_data_random["image"].apply(fft_random_values.append)
fft_random_values = np.array(fft_random_values, dtype=float)
fft_random_values_1d = np.stack((fft_random_values,), axis=-1)
fft_random_values_3d = np.stack((fft_random_values,)*3, axis=-1)
fft_random_values_3d

In [None]:
ffted_data_random.dtypes

In [None]:
fft_random_lables = np.array(ffted_data_random["number_l"].copy())
fft_random_lables

## Test

### LeNet (without FFT)

In [None]:
train_dataset, test_dataset = get_sample(fixed_data_random)

fixed_data_random

In [None]:
result = le_net_model.fit(train_dataset, epochs=EPOCHS,
                          validation_data=test_dataset)

In [None]:
result.history

In [None]:
le_net_result_dict = {"max": result}
for times in range(1,10):
    train_dataset, test_dataset = get_sample(fixed_data_random, times + 1)

    result = le_net_model.fit(train_dataset, epochs=EPOCHS,
                              validation_data=test_dataset)

    le_net_result_dict[str(times + 1)] = result

In [None]:
def save_results(result):
    result_dict = {}
    for name in result:
        try:
            param_result = {
                "params": result[name]["params"],
                "history": result[name]["history"],
                "epoch": result[name]["epoch"]
            }
        except TypeError:
            param_result = {
                "params": result[name].params,
                "history": result[name].history,
                "epoch": result[name].epoch
            }
        result_dict[name] = param_result

    return result_dict

In [None]:
le_net_result = pd.Series(save_results(le_net_result_dict))
le_net_result

In [None]:
le_net_result.to_pickle("./ml_result_data/lenet_nofft.pkl")
le_net_result.to_csv("./ml_result_data/lenet_nofft.csv")

## LeNet with fft

In [None]:
train_dataset, test_dataset = get_sample(ffted_data)
result = fft_le_net_model.fit(train_dataset, epochs=EPOCHS,
                              validation_data=test_dataset)
result.history

In [None]:
fft_le_net_result_dict = {"max": result}
for times in range(1,10):
    train_dataset, test_dataset = get_sample(ffted_data, times + 1)

    result = fft_le_net_model.fit(train_dataset, epochs=EPOCHS,
                                  validation_data=test_dataset)

    fft_le_net_result_dict[str(times + 1)] = result

In [None]:
fft_le_net_result = pd.Series(save_results(fft_le_net_result_dict))
fft_le_net_result.to_pickle("./ml_result_data/lenet_fft.pkl")
fft_le_net_result.to_csv("./ml_result_data/lenet_fft.csv")

fft_le_net_result

### NN without fft.

In [None]:
train_dataset, test_dataset = get_sample(fixed_data)
result = simple_nn_model.fit(train_dataset, epochs=EPOCHS,
                             validation_data=test_dataset)
result.history

In [None]:
nn_result_dict = {"max": result}
for times in range(1,10):
    train_dataset, test_dataset = get_sample(fixed_data, times + 1)

    result = simple_nn_model.fit(train_dataset, epochs=EPOCHS,
                                  validation_data=test_dataset)

    nn_result_dict[str(times + 1)] = result

In [None]:
nn_result = pd.Series(save_results(nn_result_dict))
nn_result.to_pickle("./ml_result_data/nn_nofft.pkl")
nn_result.to_csv("./ml_result_data/nn_nofft.csv")

nn_result

### NN with fft.

In [None]:
train_dataset, test_dataset = get_sample(ffted_data)
result = fft_simple_nn_model.fit(train_dataset, epochs=EPOCHS,
                             validation_data=test_dataset)
result.history

In [None]:
fft_nn_result_dict = {"max": result}
for times in range(1,10):
    train_dataset, test_dataset = get_sample(ffted_data, times + 1)

    result = fft_simple_nn_model.fit(train_dataset, epochs=EPOCHS,
                                 validation_data=test_dataset)

    fft_nn_result_dict[str(times + 1)] = result

In [None]:
fft_nn_result = pd.Series(save_results(fft_nn_result_dict))
fft_nn_result.to_pickle("./ml_result_data/nn_fft.pkl")
fft_nn_result.to_csv("./ml_result_data/nn_fft.csv")

fft_nn_result

### ResNet50

In [None]:
# train_dataset, test_dataset = get_sample(ffted_data, rgb="3d")
# result = resnet50_model.fit(train_dataset, epochs=EPOCHS,
#                             validation_data=test_dataset)
# result.history

## High epochs test

In [None]:
result_list = {}

### Simple NN without fft.
epochs = 400

In [None]:
train_dataset, test_dataset = get_sample(fixed_data)
result = simple_nn_model.fit(train_dataset, epochs=400,
                             validation_data=test_dataset)
result.history

In [None]:
result_list["nn_without_fft"] = result

### Simple NN with fft.
epochs = 400

In [None]:
train_dataset, test_dataset = get_sample(ffted_data)
result = fft_simple_nn_model.fit(train_dataset, epochs=400,validation_data=test_dataset)
result.history

In [None]:
result_list["nn_with_fft"] = result

### Simple NN without FFT

### LeNet with fft.
epochs = 400

In [None]:
train_dataset, test_dataset = get_sample(ffted_data)
result = fft_le_net_model.fit(train_dataset, epochs=400,validation_data=test_dataset)
result.history

In [None]:
result_list["le_net_with_fft"] = result

### LeNet without fft.
epochs = 400

In [None]:
train_dataset, test_dataset = get_sample(fixed_data)
result = le_net_model.fit(train_dataset, epochs=400,validation_data=test_dataset)
result.history

In [None]:
result_list["le_net_without_fft"] = result

In [None]:
result_list

In [None]:
result_save = save_results(result_list)

In [None]:
pd_result = pd.Series(result_save)
pd_result.to_pickle("high_epochs.pkl")
pd_result.to_csv("high_epochs.csv")
pd_result

# Final Model Export

In [None]:
train_dataset, test_dataset = get_sample(ffted_data)
result = fft_le_net_model.fit(train_dataset, epochs=25,validation_data=test_dataset)
result.history

In [None]:
fft_le_net_model.summary()

In [None]:
fft_le_net_model.save_weights("./model/final-weights")

In [None]:
fft_le_net_model.save("./model/final-model")

In [None]:
tf.saved_model.save(fft_le_net_model, "./model/final-model-saver")

# Load Test

In [None]:
load_model = tf.keras.models.load_model("./model/final-model")
load_model.summary()

In [None]:
ffted_data

In [None]:
load_test_data = ffted_data["image"].to_numpy()
load_test_data = list(map(np.array, load_test_data))
load_test_data = np.array(load_test_data)

load_test_data

In [None]:
test_result = ffted_data.copy()
test_result_list = load_model.predict(load_test_data)

In [None]:
test_result_list

In [None]:
test_result["predict_sig"] = list(zip(*test_result_list))[1]
test_result["predict_noise"] = list(zip(*test_result_list))[0]

In [None]:
test_result["predict"] = test_result["predict_sig"] > test_result["predict_noise"]
test_result["accuracy"] = test_result["predict"] == test_result["number_l"]

test_result

In [None]:
test_result["accuracy"].value_counts()

In [None]:
test_result[test_result["number_l"] == 1]["accuracy"].value_counts()

In [None]:
test_result[test_result["number_l"] == 0]["accuracy"].value_counts()