In [None]:
import pandas as pd
import numpy as np
import os
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import sklearn
import time
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, cohen_kappa_score, mean_squared_error, mean_absolute_error
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.svm import SVC, SVR
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.cluster import KMeans
from xgboost import XGBClassifier, XGBRegressor

import tensorflow as tf
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Conv2D, MaxPooling2D, UpSampling2D, Input, Rescaling, BatchNormalization, Reshape, Flatten
from tensorflow.keras.utils import image_dataset_from_directory
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard

#os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

# Anemia prediction (Classification & Regression)

In [None]:
mode = "classification"
#mode = "regression"
binary = True
threshold = [7.0, 10.0, 12.5]
threshold_name = ["severely anemic", "moderately anemic", "mildly anemic", "non-anemic"]

## Load labels

In [None]:
label_path = os.path.join("D:", "OneDrive_1_5-26-2022", "PredictingAnemia_DATA_2022-06-05_0643.csv")
label = pd.read_csv(label_path)

In [None]:
label["hgb"] = pd.to_numeric(label["hgb"], errors="coerce")
drop_index = np.where(pd.isnull(label["hgb"]))
print("drop (contains string or null): ", drop_index[0])
label = label.drop(drop_index[0])
print("mean:", label["hgb"].mean(), "std:", label["hgb"].std())
print("anemia mean: ", label["hgb"][label["hgb"] < 12.5].mean())
print("non-anemia mean: ", label["hgb"][label["hgb"] >= 12.5].mean())

In [None]:
def multi_class_label(label_i, threshold):
    label = -1
    for i, threshold_i in enumerate(threshold):
        if label_i < threshold_i:
            label = i
            break
    if label == -1:
        label = len(threshold)
    
    #print(label, label_i)
    
    return label

if mode == "classification":
    if binary:
        y = (label["hgb"] < 12.5).astype(int)
    else:
        y = np.array([multi_class_label(label_i, threshold) for label_i in label["hgb"]], dtype=np.uint8)
        y = pd.Series(data=y, index=label["hgb"].index)
elif mode == "regression":
    y = label["hgb"]
print(y.index)

In [None]:
y_available = [] 

for folder in os.listdir("./detected eyes images"):
    if int(folder)-1 in y.index:
        y_available.append(int(folder))
        
print("not available id: ")
not_available_id = []
for i in range(1, 693):
    if i not in y_available:
        not_available_id.append(i)
print(not_available_id)
print("num: ", len(not_available_id))

## Load images

In [None]:
x_img = []
y_img = []
y_id = []

for id in y_available:
    for image in os.listdir(os.path.join("./detected eyes images", str(id))):
        #print(id, image)
        img = cv2.imread(os.path.join("./detected eyes images", str(id), image))
        #print(img.shape)
        x_img.append(tf.image.resize(img, (224, 224)))
        y_img.append(y[id-1])
        y_id.append(id)
        
x_img = np.array(x_img, dtype=np.uint8)
y_img = np.array(y_img)
print(x_img.shape, y_img.shape)

In [None]:
# np.savez("img_original_224_224_3.npz", x_img=x_img, y_id=y_id)

In [None]:
# load_file = np.load("img_original_224_224_3.npz")
# x_img, y_id = load_file["x_img"], load_file["y_id"]
# print(x_img.shape, y_id.shape)

## Data preprocessing

### U-Net and autoencoder

In [None]:
x_hsv_hist = np.load('./x_hsv_hist.npz')["x"]

scaler = MinMaxScaler()
x_hsv_hist = scaler.fit_transform(x_hsv_hist)

In [None]:
def preprocess(array):
    """
    Normalizes the supplied array and reshapes it into the appropriate format.
    """

    array = array.astype("float32") / 255.0
    array = np.reshape(array, (len(array), 224, 224, 3))
    return array.astype(np.float32)

def noise(array):
    """
    Adds random noise to each image in the supplied array.
    """

    noise_factor = 0.4
    noisy_array = array + noise_factor * np.random.normal(
        loc=0.0, scale=1.0, size=array.shape
    )

    return np.clip(noisy_array, 0.0, 1.0)

def display(array1, array2):
    """
    Displays ten random images from each one of the supplied arrays.
    """

    n = 10

    indices = np.random.randint(len(array1), size=n)
    
    images1 = np.zeros_like(array1[indices, :])
    images2 = np.zeros_like(array2[indices, :])
    images1[:, :, :, 0] = array1[indices, :, :, 2]
    images1[:, :, :, 1] = array1[indices, :, :, 1]
    images1[:, :, :, 2] = array1[indices, :, :, 0]
    images2[:, :, :, 0] = array2[indices, :, :, 2]
    images2[:, :, :, 1] = array2[indices, :, :, 1]
    images2[:, :, :, 2] = array2[indices, :, :, 0]

    plt.figure(figsize=(20, 4))
    for i, (image1, image2) in enumerate(zip(images1, images2)):
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(image1)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(image2)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    plt.show()

x_img_preprocess = preprocess(x_img)
x_img_noise = noise(x_img_preprocess)

In [None]:
class Vgg16(tf.keras.Model):
    def __init__(self, pretrained = True):
        super(Vgg16, self).__init__()
        self.vggnet = tf.keras.applications.VGG16(include_top=False, weights=None)
        
    def call(self, x):
        results = []
        for ii,model in enumerate(self.vggnet.layers):
            x = model(x)
            if ii in [2,5,9,13,17]:
                results.append(x) #(64,256,256),(128,128,128),(256,64,64),(512,32,32),(512,16,16)
        return results

vgg_model = Vgg16()
vgg_model.build(input_shape=(None, 224, 224, 3))
vgg_model.summary()

class DeConv2d(tf.keras.layers.Layer):
    def __init__(self, in_channel, out_channel, kernel_size, stride, padding, dilation):
        super().__init__()
        self.up = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation='nearest')
        self.conv = tf.keras.layers.Conv2D(filters=out_channel, kernel_size=kernel_size, strides=stride, padding=padding, dilation_rate=dilation)
    
    def call(self, x):
        output = self.up(x)
        output = self.conv(output)
        return output

class UNet(tf.keras.Model):
    def __init__(self, pretrained_net, n_class):
        super().__init__()
        self.n_class = n_class
        self.pretrained_net = pretrained_net
        #####################################
        self.relu = tf.keras.layers.ReLU()
        self.deconv1 = DeConv2d(512, 512, kernel_size=3, stride=1, padding="same", dilation=1)
        self.bn1 = tf.keras.layers.BatchNormalization()
        
        self.deconv2 = DeConv2d(1024, 256, kernel_size=3, stride=1, padding="same", dilation=1)
        self.bn2 = tf.keras.layers.BatchNormalization()
        
        self.deconv3 = DeConv2d(512, 128, kernel_size=3, stride=1, padding="same", dilation=1)
        self.bn3 = tf.keras.layers.BatchNormalization()
        
        self.deconv4 = DeConv2d(256, 64, kernel_size=3, stride=1, padding="same", dilation=1)
        self.bn4 = tf.keras.layers.BatchNormalization()
        
        self.classifier = tf.keras.layers.Conv2D(n_class, kernel_size=1, activation="sigmoid")
        #####################################
    
    def call(self, x):
        #####################################
        pre_output = self.pretrained_net(x)
        output = self.bn1(self.relu(self.deconv1(pre_output[4]))) #(512,32,32)
        output = self.bn2(self.relu(self.deconv2(tf.concat([output, pre_output[3]], axis=-1)))) #(256,64,64)
        output = self.bn3(self.relu(self.deconv3(tf.concat([output, pre_output[2]], axis=-1)))) #(128,128,128)
        output = self.bn4(self.relu(self.deconv4(tf.concat([output, pre_output[1]], axis=-1)))) #(64,256,256)
        output = self.classifier(tf.concat([output, pre_output[0]], axis=-1))
        return output
        #####################################
        
seg_model = UNet(pretrained_net=vgg_model, n_class=3)
seg_model.compile(optimizer='adam', loss='binary_crossentropy')
seg_model.build(input_shape=(None, 224, 224, 3))
seg_model.summary()

seg_model.fit(
    x=x_img_preprocess,
    y=x_img_preprocess,
    epochs=100,
    batch_size=16,
    shuffle=True,
)

x_img_denoise = seg_model.predict(x_img_preprocess, batch_size=16)
display(x_img_preprocess, x_img_noise)
display(x_img_preprocess, x_img_denoise)

In [None]:
class Autoencoder(tf.keras.Model):
    def __init__(self):
        super().__init__()
        # Encoder
        self.conv1 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.max1 = MaxPooling2D((2, 2), padding='same')
        self.batch1 = BatchNormalization()
        self.conv2 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.max2 = MaxPooling2D((2, 2), padding='same')
        
        # Embeddings
        self.ave1 = GlobalAveragePooling2D()
        self.dropout = Dropout(0.1)
        
        # Decoder
        self.conv3 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.up3 = UpSampling2D((2, 2))
        self.conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.up4 = UpSampling2D((2, 2))
        self.conv5 = Conv2D(3, (3, 3), activation='sigmoid', padding='same')
        
    def call(self, x, training):
        x_img, x_hist = x
        x = self.conv1(x_img)
        x = self.max1(x)
        x = self.batch1(x)
        x = self.conv2(x)
        x = self.max2(x)
        
        #embeddings = self.dropout(x_hist, training=training) + self.ave1(x)
        embeddings = x_hist + self.ave1(x)
        
        x = self.conv3(x)
        x = self.up3(x)
        x = self.conv4(x)
        x = self.up4(x)
        x = self.conv5(x)
        return embeddings, x

autoencoder = Autoencoder()
autoencoder.build(input_shape=[(None, 224, 224, 3), (None, 256)])
autoencoder.summary()
optimizer = tf.keras.optimizers.Adam()
bce_loss_fn = tf.keras.losses.BinaryCrossentropy()
mse_loss_fn = tf.keras.losses.MeanSquaredError()
bce_loss_metric = tf.keras.metrics.Mean()
mse_loss_metric = tf.keras.metrics.Mean()

train_dataset = tf.data.Dataset.from_tensor_slices({"img": x_img_preprocess, "hist": x_hsv_hist})
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(16)

epochs = 10
for epoch in range(epochs):
    #print("Start of epoch %d" % (epoch,))
    start_time = time.time()
    bce_loss_metric.reset_state()
    mse_loss_metric.reset_state()

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        x_batch_img, x_batch_hsv_hist = x_batch_train["img"], x_batch_train["hist"]
        with tf.GradientTape() as tape:
            embeddings, reconstructed = autoencoder([x_batch_img, x_batch_hsv_hist], training=True)
            # Compute reconstruction loss
            loss = bce_loss_fn(x_batch_img, reconstructed) + mse_loss_fn(x_batch_hsv_hist, embeddings) * 10

        grads = tape.gradient(loss, autoencoder.trainable_weights)
        optimizer.apply_gradients(zip(grads, autoencoder.trainable_weights))

        bce_loss_metric.update_state(bce_loss_fn(x_batch_img, reconstructed))
        mse_loss_metric.update_state(mse_loss_fn(x_batch_hsv_hist, embeddings) * 10)
        
    print("epoch %d: bce mean loss = %.4f, mse mean loss = %.4f, elapsed time: %ds" % (epoch, bce_loss_metric.result(), mse_loss_metric.result(), time.time()-start_time))

test_dataset = tf.data.Dataset.from_tensor_slices({"img": x_img_preprocess, "hist": x_hsv_hist})
test_dataset = test_dataset.batch(16)
    
x_img_denoise = None
x_embeddings = None
for x_batch_test in test_dataset:
    x_batch_img, x_batch_hsv_hist = x_batch_test["img"], x_batch_test["hist"]
    output_embeddings, output_x = autoencoder([x_batch_img, x_batch_hsv_hist], training=False)
    if x_img_denoise is None:
        x_embeddings, x_img_denoise = output_embeddings, output_x
    else:
        x_embeddings = np.concatenate((x_embeddings, output_embeddings), axis=0)
        x_img_denoise = np.concatenate((x_img_denoise, output_x), axis=0)
    
x_img_denoise = np.array(x_img_denoise, dtype=np.float32)
display(x_img_preprocess, x_img_noise)
display(x_img_preprocess, x_img_denoise)

In [None]:
x_img = np.array(x_img_denoise * 255.0, copy=True, dtype=np.uint8)
# x_embeddings = scaler.inverse_transform(x_embeddings)
# x_embeddings = np.array(x_embeddings, dtype=np.uint8)

#print(x_embeddings[0])

### Mask (Segmentation output)

In [None]:
use_mask = False

In [None]:
use_mask = True

mask_data = np.load("./mask autoencoder/mask_autoencoder_original_224_224_3.npz")
x_mask_raw = np.expand_dims(mask_data['x_mask'], axis=-1)
x_mask = []
for img in x_mask_raw:
    x_mask.append(tf.image.resize(img, (224, 224)))
x_mask = np.array(x_mask, dtype=np.uint8)
print(mask_data["y_id"])

plt.figure(figsize=(20, 4))
for i, (x_img_i, x_mask_i) in enumerate(zip(x_img[1280:1290], x_mask[1280:1290])):
    x_img_i_rgb = np.zeros_like(x_img_i)
    x_img_i_rgb[:, :, 0] = x_img_i[:, :, 2]
    x_img_i_rgb[:, :, 1] = x_img_i[:, :, 1]
    x_img_i_rgb[:, :, 2] = x_img_i[:, :, 0]
    ax = plt.subplot(2, 10, i + 1)
    plt.imshow(x_img_i_rgb)
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    ax = plt.subplot(2, 10, i + 1 + 10)
    plt.imshow(x_mask_i, cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

plt.show()

In [None]:
x_img_masked = x_img * (x_mask / 255.0)
x_img_masked = np.array(x_img_masked, copy=True, dtype=np.uint8)

plt.figure(figsize=(20, 4))
for i, x_img_i in enumerate(x_img_masked[1280:1290]):
    x_img_i_rgb = np.zeros_like(x_img_i)
    x_img_i_rgb[:, :, 0] = x_img_i[:, :, 2]
    x_img_i_rgb[:, :, 1] = x_img_i[:, :, 1]
    x_img_i_rgb[:, :, 2] = x_img_i[:, :, 0]
    ax = plt.subplot(1, 10, i + 1)
    plt.imshow(x_img_i_rgb)
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

plt.show()

print(y_img[1280:1290])

In [None]:
# Cluster
# Find centroids

def clustering_slice(original_img, mask_img, n_clusters=2):
    original_shape = original_img.shape
    mask_img = mask_img.reshape(-1, 3)
    kmeans = KMeans(n_clusters=n_clusters, max_iter=500)
    kmeans.fit(mask_img)

    labels = kmeans.labels_
    labels = labels.reshape(original_shape[:2])
    left, right, top, bottom = label.shape[0], 0, labels.shape[1], 0
    for i in range(labels.shape[0]):
        for j in range(labels.shape[1]):
            if labels[i][j] == 1:
                if i < left:
                    left = i
                if i > right:
                    right = i
                if j < top:
                    top = j
                if j > bottom:
                    bottom = j
    #print(left, right, top, bottom)

    inside = original_img[left:right, top:bottom, :]
    outside = np.array(original_img)
    outside[left:right, top:bottom, :] = 0
    
    # TODO: expansion / shrinkage, fixed dimension
    
    
    return inside, outside

x_inside = []
x_outside = []
for i in range(len(x_img)):
    #print(i)
    inside, outside = clustering_slice(x_img[i], x_img_masked[i])
    x_inside.append(inside)
    x_outside.append(outside)

In [None]:
x_img_valid = []
x_mask_valid = []
x_img_masked_valid = []
x_inside_valid = []
x_outside_valid = []
y_img_valid = []
y_id_valid = []

for i, img in enumerate(x_inside):
    #print(i.shape)
    if img.shape[0] != 0:
        x_inside_valid.append(img)
        x_outside_valid.append(x_outside[i])
        y_img_valid.append(y_img[i])
        x_img_valid.append(x_img[i])
        x_mask_valid.append(x_mask[i])
        x_img_masked_valid.append(x_img_masked[i])
        y_id_valid.append(y_id[i])
        
y_img_valid = np.array(y_img_valid)
x_img_valid = np.array(x_img_valid)
x_mask_valid = np.array(x_mask_valid)
x_outside_valid = np.array(x_outside_valid)

print("number of images with all black: ", len(x_inside)-len(x_inside_valid))

# remove all black images and its corresponding labels
x_inside = x_inside_valid
x_outside = x_outside_valid
y_img = y_img_valid
x_img = x_img_valid
x_mask = x_mask_valid
x_img_masked = x_img_masked_valid
y_id = y_id_valid

In [None]:
# index = 1200
# cv2.imshow('img', x_img[index])
# cv2.imshow('mask', x_mask[index])
# cv2.imshow('img_mask', x_img_masked[index])
# cv2.imshow('inside', x_inside[index])
# cv2.imshow('outside', x_outside[index])
# print(x_inside[index].shape)
# cv2.waitKey(0)
# cv2.destroyAllWindows()

In [None]:
mean_shape = [0, 0]
width = []
height = []
area = []
for i in x_inside:
    mean_shape[0] += i.shape[0]
    mean_shape[1] += i.shape[1]
    width.append(int(i.shape[0]))
    height.append(int(i.shape[1]))
    area.append(int(i.shape[0]*i.shape[1]))

mean_shape[0] /= len(x_inside)
mean_shape[1] /= len(x_inside)

print(mean_shape)

In [None]:
width_hist = np.histogram(width, range(225))[0]
plt.bar(range(224), width_hist, width=1, edgecolor='none')
plt.title("width histogram")
plt.xlabel("width")
plt.ylabel("number of images")
plt.show()
print(np.mean(width), np.std(width))

In [None]:
height_hist = np.histogram(height, range(225))[0]
plt.bar(range(224), height_hist, width=1, edgecolor='none')
plt.title("height histogram")
plt.xlabel("height")
plt.ylabel("number of images")
plt.show()
print(np.mean(height), np.std(height))

In [None]:
area_hist = np.histogram(area, len(np.bincount(area))//100)[0]
plt.bar(np.arange(len(np.bincount(area))//100), area_hist, width=1, edgecolor='none')
plt.title("area histogram")
plt.xlabel("area (width * height / 100)")
plt.ylabel("number of images")
plt.show()
print(np.mean(area), np.std(area))

In [None]:
x_img = x_inside

### Changing the contrast and brightness

In [None]:
lookUpTable = np.empty((1,256), np.uint8)
gamma = 1.3
for i in range(256):
    lookUpTable[0,i] = np.clip(pow(i / 255.0, gamma) * 255.0, 0, 255)

def adjust_brightness(img, lookUpTable, alpha=1.3, beta=40):
    new_image = np.zeros(img.shape, img.dtype)
    
    #for y in range(img.shape[0]):
    #    for x in range(img.shape[1]):
    #        for c in range(img.shape[2]):
    #            new_image[y,x,c] = np.clip(alpha*img[y,x,c] + beta, 0, 255)

    new_image = cv2.convertScaleAbs(img, alpha=alpha, beta=beta)
                
    res = cv2.LUT(new_image, lookUpTable)
                
    return res

# x_brightness = np.array([adjust_brightness(xi, lookUpTable) for xi in x_img], dtype=np.uint8)
# print(x_brightness.shape)

# x_preprocessed = np.array(x_brightness, copy=True)

In [None]:
# frame = x_img[10]
# result = x_brightness[10]

# cv2.imshow('frame', frame)
# cv2.imshow('result', result)

# cv2.waitKey(0)

# cv2.destroyAllWindows()

### Clustering filter

In [None]:
def clustering_filter(img, n_clusters=5):
    original_shape = img.shape
    img = img.reshape(-1, 3)
    kmeans = KMeans(n_clusters=n_clusters)
    kmeans.fit(img)

    labels=kmeans.labels_
    #print(labels)
    labels=list(labels)

    centroid=kmeans.cluster_centers_
    #print(centroid)

    percent=[]
    for i in range(len(centroid)):
      j=labels.count(i)
      j=j/(len(labels))
      percent.append(j)
    #print(percent)

    # bgr to rgb
    #plt.pie(percent,colors=np.array(centroid[:, [2, 1, 0]]/255),labels=np.arange(len(centroid)))
    #plt.show()

    sorted_percent = sorted(percent)
    remove_index = [percent_i in [sorted_percent[0], sorted_percent[1]] for percent_i in percent]
    #print(remove_index)

    result = np.array(img, copy=True)
    for i, remove in enumerate(remove_index):
        if remove:
            result[labels==np.array(i)] = centroid[i]
    result = result.reshape(original_shape)
    
    return result

# x_cluster = np.array([clustering_filter(xi) for xi in x_img], dtype=np.uint8)
# print(x_cluster.shape)

# x_preprocessed = np.array(x_cluster, copy=True)

In [None]:
# frame = x_img[321]
# result = x_cluster[321]

# cv2.imshow('frame', frame)
# cv2.imshow('result', result)

# cv2.waitKey(0)

# cv2.destroyAllWindows()

### HSV filter

In [None]:
dummy_sum = []

def hsv_filter(img, init_value=100, end_value=0, average_value=20000, adaptive=False):
    mask_value = 0
    sv_value = init_value
    
    if adaptive:
        while mask_value <= average_value and sv_value >= end_value:
            # Threshold of blue in HSV space
            lower_red = np.array([0,sv_value,sv_value])
            upper_red = np.array([10,255,255])
            hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
            # preparing the mask to overlay
            mask = cv2.inRange(hsv, lower_red, upper_red)
            mask_value = np.sum(mask/255)
            sv_value -= 1
    else:
        lower_red = np.array([0,sv_value,sv_value])
        upper_red = np.array([10,255,255])
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        # preparing the mask to overlay
        mask = cv2.inRange(hsv, lower_red, upper_red)
        mask_value = np.sum(mask/255)
        
    dummy_sum.append(mask_value)


    # The black region in the mask has the value of 0,
    # so when multiplied with original image removes all non-blue regions
    result = cv2.bitwise_and(img, img, mask = mask)
    
    return result

if not use_mask:
    x_hsv = np.array([hsv_filter(xi, 120) for xi in x_img], dtype=np.uint8)
    print(x_hsv.shape)
    x_preprocessed = np.array(x_hsv, copy=True)
else:
    x_hsv = [hsv_filter(xi, 120) for xi in x_img]
    x_preprocessed = x_hsv

print(np.mean(np.array(dummy_sum), axis=0))

In [None]:
# frame = x_img[60]
# result = x_hsv[60]

# cv2.imshow('frame', frame)
# cv2.imshow('result', result)

# cv2.waitKey(0)

# cv2.destroyAllWindows()

### Histrogram

In [None]:
# Blue, Green, Red and A (Transparency)
def red_histogram(img):
    return np.histogram(img[:, :, 2].flatten(), range(257))[0]

x_hist = np.array([red_histogram(xi) for xi in x_preprocessed])
print(x_hist.shape)

x_final = x_hist
y_final = y_img

In [None]:
plt.bar(np.arange(256)[1:] - 0.5, x_hist[0][1:], width=1, edgecolor='none')
plt.xlim([-0.5, 255.5])
plt.show()

In [None]:
#np.savez_compressed('./x_hsv_hist', x=x_hist)

## Mean 

In [None]:
def mean_feature(img, mask=False):
    flatten_img = img.reshape(-1, 3)
    if mask:
        index = []
        for i, pixel in enumerate(flatten_img):
            if not (pixel == [0, 0, 0]).all():
                index.append(i)
        if len(index) == 0:
            gbr_mean = np.array([0, 0, 0], dtype=np.float32)
        else:
            gbr_mean = np.mean(flatten_img[index], axis=0)
    else:
        gbr_mean = np.mean(flatten_img, axis=0)

    hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    flatten_hsv_img = hsv_img.reshape(-1, 3)
    if mask:
        if len(index) == 0:
            hsv_mean = np.array([0, 0, 0], dtype=np.float32)
        else:
            hsv_mean = np.mean(flatten_hsv_img[index], axis=0)
    else:
        hsv_mean = np.mean(flatten_hsv_img, axis=0)
    
    return np.concatenate((gbr_mean, hsv_mean), axis=0)

x_mean = np.array([mean_feature(xi) for xi in x_preprocessed])
print(x_mean.shape)

x_final = x_mean
y_final = y_img

In [None]:
# x_hsv = np.array([hsv_filter(xi, 120) for xi in x_img], dtype=np.uint8)
# print(x_hsv.shape)
# print(np.mean(np.array(dummy_sum), axis=0))
# x_preprocessed = np.array(x_hsv, copy=True)
# x_mean = np.array([mean_feature(xi) for xi in x_preprocessed])
# print(x_mean.shape)
# x_mean_list = np.array(x_mean)

# # x_hsv = np.array([hsv_filter(xi, 80) for xi in x_img], dtype=np.uint8)
# # print(x_hsv.shape)
# # print(np.mean(np.array(dummy_sum), axis=0))
# # x_preprocessed = np.array(x_hsv, copy=True)
# # x_mean = np.array([mean_feature(xi) for xi in x_preprocessed])
# # print(x_mean.shape)

# # x_mean_list = np.concatenate((x_mean_list, x_mean), axis=1)
# # print(x_mean_list.shape)


# x_final = x_mean_list
# y_final = y_img

## Train test split

In [None]:
# split by id
x_train, y_train = x_final[[i > 80 for i in y_id]], y_final[[i > 80 for i in y_id]]
x_test, y_test = x_final[[i <= 80 for i in y_id]], y_final[[i <= 80 for i in y_id]]

#x_train, y_train = x_final[[i < 620 for i in y_id]], y_final[[i < 620 for i in y_id]]
#x_test, y_test = x_final[[i >= 620 for i in y_id]], y_final[[i >= 620 for i in y_id]]

if mode == "classification":
    if binary:
        print("train: (0)", np.sum(y_train==0), "(1)", np.sum(y_train==1))
        print("test: (0)", np.sum(y_test==0), "(1)", np.sum(y_test==1))
    else:
        for i in range(len(threshold)+1):
            print(i)
            print("train:", np.sum(y_train==i), " test:", np.sum(y_test==i))
elif mode == "regression":
    print(np.mean(y_train))
    print(np.mean(y_test))

In [None]:
# normalization
# scaler = MinMaxScaler()
# x_train = scaler.fit_transform(x_train)
# x_test = scaler.transform(x_test)

In [None]:
if mode == "classification":
    if binary:
        plt.plot(np.arange(256)[1:] - 0.5, x_train[y_train==0].mean(axis=0)[1:], label='non-anemia')
        plt.fill_between(np.arange(256)[1:] - 0.5, x_train[y_train==0].mean(axis=0)[1:]-x_train[y_train==0].std(axis=0)[1:], x_train[y_train==0].mean(axis=0)[1:]+x_train[y_train==0].std(axis=0)[1:], alpha=0.4)
        plt.plot(np.arange(256)[1:] - 0.5, x_train[y_train==1].mean(axis=0)[1:], label='anemia')
        plt.fill_between(np.arange(256)[1:] - 0.5, x_train[y_train==1].mean(axis=0)[1:]-x_train[y_train==1].std(axis=0)[1:], x_train[y_train==1].mean(axis=0)[1:]+x_train[y_train==1].std(axis=0)[1:], alpha=0.4)
    else:
        for i in range(len(threshold)+1):
            plt.plot(np.arange(256)[1:] - 0.5, x_train[y_train==i].mean(axis=0)[1:], label=threshold_name[i])
            plt.fill_between(np.arange(256)[1:] - 0.5, x_train[y_train==i].mean(axis=0)[1:]-x_train[y_train==i].std(axis=0)[1:], x_train[y_train==i].mean(axis=0)[1:]+x_train[y_train==i].std(axis=0)[1:], alpha=0.4)
elif mode == "regression":
    plt.plot(np.arange(256)[1:] - 0.5, x_train[y_train>=12.5].mean(axis=0)[1:], label='non-anemia')
    plt.fill_between(np.arange(256)[1:] - 0.5, x_train[y_train>=12.5].mean(axis=0)[1:]-x_train[y_train>=12.5].std(axis=0)[1:], x_train[y_train>=12.5].mean(axis=0)[1:]+x_train[y_train>=12.5].std(axis=0)[1:], alpha=0.4)
    plt.plot(np.arange(256)[1:] - 0.5, x_train[y_train<12.5].mean(axis=0)[1:], label='anemia')
    plt.fill_between(np.arange(256)[1:] - 0.5, x_train[y_train<12.5].mean(axis=0)[1:]-x_train[y_train<12.5].std(axis=0)[1:], x_train[y_train<12.5].mean(axis=0)[1:]+x_train[y_train<12.5].std(axis=0)[1:], alpha=0.4)
plt.legend()
plt.show()

In [None]:
if mode == "classification":
    if binary:
        plt.plot(np.arange(256)[1:] - 0.5, x_test[y_test==0].mean(axis=0)[1:], label='non-anemia')
        plt.fill_between(np.arange(256)[1:] - 0.5, x_test[y_test==0].mean(axis=0)[1:]-x_test[y_test==0].std(axis=0)[1:], x_test[y_test==0].mean(axis=0)[1:]+x_test[y_test==0].std(axis=0)[1:], alpha=0.4)
        plt.plot(np.arange(256)[1:] - 0.5, x_test[y_test==1].mean(axis=0)[1:], label='anemia')
        plt.fill_between(np.arange(256)[1:] - 0.5, x_test[y_test==1].mean(axis=0)[1:]-x_test[y_test==1].std(axis=0)[1:], x_test[y_test==1].mean(axis=0)[1:]+x_test[y_test==1].std(axis=0)[1:], alpha=0.4)
    else:
        for i in range(len(threshold)+1):
            plt.plot(np.arange(256)[1:] - 0.5, x_test[y_test==i].mean(axis=0)[1:], label=threshold_name[i])
            plt.fill_between(np.arange(256)[1:] - 0.5, x_test[y_test==i].mean(axis=0)[1:]-x_test[y_test==i].std(axis=0)[1:], x_test[y_test==i].mean(axis=0)[1:]+x_test[y_test==i].std(axis=0)[1:], alpha=0.4)
elif mode == "regression":
    plt.plot(np.arange(256)[1:] - 0.5, x_test[y_test>=12.5].mean(axis=0)[1:], label='non-anemia')
    plt.fill_between(np.arange(256)[1:] - 0.5, x_test[y_test>=12.5].mean(axis=0)[1:]-x_test[y_test>=12.5].std(axis=0)[1:], x_test[y_test>=12.5].mean(axis=0)[1:]+x_test[y_test>=12.5].std(axis=0)[1:], alpha=0.4)
    plt.plot(np.arange(256)[1:] - 0.5, x_test[y_test<12.5].mean(axis=0)[1:], label='anemia')
    plt.fill_between(np.arange(256)[1:] - 0.5, x_test[y_test<12.5].mean(axis=0)[1:]-x_test[y_test<12.5].std(axis=0)[1:], x_test[y_test<12.5].mean(axis=0)[1:]+x_test[y_test<12.5].std(axis=0)[1:], alpha=0.4)
plt.legend()
plt.show()

In [None]:
if mode == "classification":
    if binary:
        plt.plot(np.arange(x_train.shape[1]), x_train[y_train==0].mean(axis=0), label='non-anemia')
        plt.fill_between(np.arange(x_train.shape[1]), x_train[y_train==0].mean(axis=0)-x_train[y_train==0].std(axis=0), x_train[y_train==0].mean(axis=0)+x_train[y_train==0].std(axis=0), alpha=0.4)
        plt.plot(np.arange(x_train.shape[1]), x_train[y_train==1].mean(axis=0), label='anemia')
        plt.fill_between(np.arange(x_train.shape[1]), x_train[y_train==1].mean(axis=0)-x_train[y_train==1].std(axis=0), x_train[y_train==1].mean(axis=0)+x_train[y_train==1].std(axis=0), alpha=0.4)
plt.legend()
plt.show()

In [None]:
if mode == "classification":
    if binary:
        plt.plot(np.arange(x_test.shape[1]), x_test[y_test==0].mean(axis=0), label='non-anemia')
        plt.fill_between(np.arange(x_test.shape[1]), x_test[y_test==0].mean(axis=0)-x_test[y_test==0].std(axis=0), x_test[y_test==0].mean(axis=0)+x_test[y_test==0].std(axis=0), alpha=0.4)
        plt.plot(np.arange(x_test.shape[1]), x_test[y_test==1].mean(axis=0), label='anemia')
        plt.fill_between(np.arange(x_test.shape[1]), x_test[y_test==1].mean(axis=0)-x_test[y_test==1].std(axis=0), x_test[y_test==1].mean(axis=0)+x_test[y_test==1].std(axis=0), alpha=0.4)
plt.legend()
plt.show()

## Classifiers

In [None]:
def print_results(y_true, y_hat, mode, binary, debug):
    results = {}
    if mode == "classification":
        if binary:
            average = "binary"
            multi_class = "raise"
            display_labels = ["non-anemia", "anemia"]
        else:
            average = "macro"
            multi_class = "ovo"
            display_labels = threshold_name    
        results["accuracy"] = accuracy_score(y_true, np.argmax(y_hat, axis=1))
        results["precision"] = precision_score(y_true, np.argmax(y_hat, axis=1), average=average)
        results["recall"] = recall_score(y_true, np.argmax(y_hat, axis=1), average=average)
        if binary:
            results["roc_auc"] = roc_auc_score(y_true, y_hat[:, 1], multi_class=multi_class)
        else:
            results["roc_auc"] = roc_auc_score(y_true, y_hat, multi_class=multi_class)
        results["f1"] = f1_score(y_true, np.argmax(y_hat, axis=1), average=average)
        results["cohen_kappa"] = cohen_kappa_score(y_true, np.argmax(y_hat, axis=1))
        y_hat = np.argmax(y_hat, axis=1)
        cm = confusion_matrix(y_true, y_hat)
        if debug:
            disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels)
            disp.plot()
            plt.show()
    elif mode == "regression":
        results["mse"] = mean_squared_error(y_true, y_hat)
        results["mae"] = mean_absolute_error(y_true, y_hat)
        
    if debug:
        for key, value in results.items():
            print(key, ": ", value)
    
    return results

In [None]:
def evaluate_one_round(x_train, y_train, x_test, y_test, debug=True):
    prediction_csv = {}
    prediction_csv["ground truth"] = y_test
    prediction_results = {}

    if mode == "classification":
        clf = LinearDiscriminantAnalysis()
        clf.fit(x_train, y_train)
        if debug:
            print("Linear Discriminant Analysis")
        prediction_results["Linear Discriminant Analysis"] = print_results(y_test, clf.predict_proba(x_test), mode, binary, debug)
        prediction_csv["Linear Discriminant Analysis"] = clf.predict_proba(x_test)[:, 1] >= 0.5

        clf = LogisticRegression(random_state=0, max_iter=10000, solver='saga')
        clf.fit(x_train, y_train)
        if debug:
            print("Logistic regression")
        prediction_results["Logistic regression"] = print_results(y_test, clf.predict_proba(x_test), mode, binary, debug)
        prediction_csv["Logistic regression"] = clf.predict_proba(x_test)[:, 1] >= 0.5

        clf = RandomForestClassifier(n_estimators=100, max_depth=100, random_state=0)
        clf.fit(x_train, y_train)
        if debug:
            print("Random forest")
        prediction_results["Random forest"] = print_results(y_test, clf.predict_proba(x_test), mode, binary, debug)
        prediction_csv["Random forest"] = clf.predict_proba(x_test)[:, 1] >= 0.5

        clf = SVC(random_state=0, C=5.0, probability=True)
        clf.fit(x_train, y_train)
        if debug:
            print("SVM")
        prediction_results["SVM"] = print_results(y_test, clf.predict_proba(x_test), mode, binary, debug)
        prediction_csv["SVM"] = clf.predict_proba(x_test)[:, 1] >= 0.5

        clf = XGBClassifier(random_state=0)
        clf.fit(x_train, y_train)
        if debug:
            print("XGBoost")
        prediction_results["XGBoost"] = print_results(y_test, clf.predict_proba(x_test), mode, binary, debug)
        prediction_csv["XGBoost"] = clf.predict_proba(x_test)[:, 1] >= 0.5
    elif mode == "regression":
        clf = LinearRegression()
        clf.fit(x_train, y_train)
        if debug:
            print("Linear regression")
        prediction_results["Linear regression"] = print_results(y_test, clf.predict(x_test), mode, binary, debug)

        clf = RandomForestRegressor(n_estimators=100, max_depth=5, random_state=0)
        clf.fit(x_train, y_train)
        if debug:
            print("Random forest")
        prediction_results["Random forest"] = print_results(y_test, clf.predict(x_test), mode, binary, debug)

        clf = SVR(C=5.0)
        clf.fit(x_train, y_train)
        if debug:
            print("SVM")
        prediction_results["SVM"] = print_results(y_test, clf.predict(x_test), mode, binary, debug)

        clf = XGBRegressor(n_estimators=10, random_state=0)
        clf.fit(x_train, y_train)
        if debug:
            print("XGBoost")
        prediction_results["XGBoost"] = print_results(y_test, clf.predict(x_test), mode, binary, debug)
        
    metric = "accuracy"
    highest_metric = -1
    method = None
    for key, value in prediction_results.items():
        if value[metric] > highest_metric:
            highest_metric = value[metric]
            method = key
    print("highest ", metric, ": ", method, " ", highest_metric)
        
    return prediction_csv, prediction_results

In [None]:
prediction_csv = evaluate_one_round(x_train, y_train, x_test, y_test)

In [None]:
# output prediction to csv file

# pd_index = ["y_id"]
# pd_data = np.expand_dims(np.array(np.array(y_id)[[i <= 80 for i in y_id]]), axis=-1)
# for key, value in prediction_csv.items():
#     pd_index.append(key)
#     value = np.expand_dims(np.array(value, dtype=np.int32), axis=-1)
#     pd_data = np.concatenate((pd_data, value), axis=1)
    
# pd_index = np.array(pd_index)

# pd_prediction = pd.DataFrame(data=pd_data, columns=pd_index)

# pd_prediction.to_csv("prediction(hsv_histogram_red).csv")

## Visualization

In [None]:
if mode == "classification":
    x_img_train, y_img_train = x_img[[i > 80 for i in y_id]], y_img[[i > 80 for i in y_id]]
    x_img_test, y_img_test = x_img[[i <= 80 for i in y_id]], y_img[[i <= 80 for i in y_id]]

    predictions = np.argmax(clf.predict_proba(x_test), axis=1)
    correct_index = predictions == y_test
    for i, correct in enumerate(correct_index):
        if correct:
            correct_img = x_img_test[i]
            correct_img = correct_img.astype(np.uint8)
            cv2.putText(correct_img, str(y_test[i]), (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
            cv2.imshow("correct", correct_img)
            cv2.waitKey(500)
    cv2.destroyAllWindows()

In [None]:
if mode == "classification":
    wrong_index = predictions != y_test
    for i, wrong in enumerate(wrong_index):
        if wrong:
            wrong_img = x_img_test[i]
            wrong_img = wrong_img.astype(np.uint8)
            cv2.putText(wrong_img, str(y_test[i]), (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
            cv2.imshow("wrong", wrong_img)
            cv2.waitKey(500)
    cv2.destroyAllWindows()

## K fold cross validation

In [None]:
# id_list = np.arange(np.min(y_id), np.max(y_id)+1)
# kf = KFold(n_splits=10)
# #kf = KFold(n_splits=5, shuffle=True, random_state=0)
# for i, (train_index, test_index) in enumerate(kf.split(id_list)):
#     print(f"Fold {i}:")
#     #print(f"  Train: index={train_index}")
#     #print(f"  Test:  index={test_index}")
#     train_id = id_list[train_index]
#     test_id = id_list[test_index]
#     #print(f"  Train: index={train_id}")
#     #print(f"  Test:  index={test_id}")
    
#     x_train, y_train = x_final[train_id], y_final[train_id]
#     x_test, y_test = x_final[test_id], y_final[test_id]
    
#     print("train: (0)", np.sum(y_train==0), "(1)", np.sum(y_train==1))
#     print("test: (0)", np.sum(y_test==0), "(1)", np.sum(y_test==1))
    
#     prediction_csv = evaluate_one_round()

In [None]:
# load y_id
train_id = np.load("k fold (feature level)/train_id/train_id.npy", allow_pickle=True)
test_id = np.load("k fold (feature level)/test_id/test_id.npy", allow_pickle=True)

In [None]:
def load_feature(path):
    feature = {"id": [], "mean_H": [], "mean_S": [], "mean_V":[], "mean_R": [], "mean_G": [], "mean_B": []}
    
    feature_pd = pd.read_csv(path)
    feature_pd.drop(columns=["Unnamed: 0"], inplace=True)
    
    for column in feature_pd.columns:
        feature[column] = np.array(feature_pd[column].to_list())

    return feature

In [None]:
kfold_prediction_results = []

feature_fusion = True

for i, (train_index, test_index) in enumerate(zip(train_id, test_id)):
    print(f"Fold {i}:")
    #print(f"  Train: index={train_index}")
    #print(f"  Test:  index={test_index}")
    
    train_idx, test_idx = [], []
    for id in y_id:
        if id in train_index:
            train_idx.append(True)
        else:
            train_idx.append(False)
            
        if id in test_index:
            test_idx.append(True)
        else:
            test_idx.append(False)

    #print(f"  Train: index={train_idx}")
    #print(f"  Test:  index={test_idx}")
    
    x_train, y_train = x_final[train_idx], y_final[train_idx]
    x_test, y_test = x_final[test_idx], y_final[test_idx]
    
    if feature_fusion:
        x_train_fusion, x_test_fusion = [], []
        train_feature = load_feature("k fold (feature level)/k_fold_feature/train_feature_kfold_"+str(i)+".csv")
        test_feature = load_feature("k fold (feature level)/k_fold_feature/test_feature_kfold_"+str(i)+".csv")
        y_train_id = np.array(y_id)[train_idx]
        y_test_id = np.array(y_id)[test_idx]
        
        #[]
        #train_idx, test_idx = [], []
        
        for out_id, id_i in enumerate(y_train_id):
#             if id_i in repeat:
#                 train_idx.append(False)
#                 continue
#             else:
#                 train_idx.append(True)
            for in_id, id_match in enumerate(train_feature["id"]):
                if id_i == id_match:
                    repeat.append(id_match)
                    x_train_fusion.append(np.concatenate((np.array(x_train[out_id]), 
                                                         np.array([train_feature["mean_G"][in_id], train_feature["mean_B"][in_id],
                                                                  train_feature["mean_R"][in_id], train_feature["mean_H"][in_id],
                                                                  train_feature["mean_S"][in_id], train_feature["mean_V"][in_id]])), axis=0))
                    break
        x_train_fusion = np.array(x_train_fusion)
        #print(x_train_fusion.shape)
        
        for out_id, id_i in enumerate(y_test_id):
#             if id_i in repeat:
#                 test_idx.append(False)
#                 continue
#             else:
#                 test_idx.append(True)
            for in_id, id_match in enumerate(test_feature["id"]):
                if id_i == id_match:
                    repeat.append(id_match)
                    x_test_fusion.append(np.concatenate((np.array(x_test[out_id]), 
                                                         np.array([test_feature["mean_G"][in_id], test_feature["mean_B"][in_id],
                                                                  test_feature["mean_R"][in_id], test_feature["mean_H"][in_id],
                                                                  test_feature["mean_S"][in_id], test_feature["mean_V"][in_id]])), axis=0))
                    break
        x_test_fusion = np.array(x_test_fusion)
        #print(x_test_fusion.shape)
        
        # nail only
        #y_train = y_train[train_idx]
        #y_test = y_test[test_idx]
    
    print("train: (0)", np.sum(y_train==0), "(1)", np.sum(y_train==1))
    print("test: (0)", np.sum(y_test==0), "(1)", np.sum(y_test==1))
    
    if feature_fusion:
        x_train = x_train_fusion
        x_test = x_test_fusion
        
#         for feature_i in range(x_train.shape[1]):
#             if feature_i == 6 or feature_i == 7 or feature_i == 8:
#                 continue
#             x_train[:, feature_i] = x_train[:, feature_i] / 255.
#             x_test[:, feature_i] = x_test[:, feature_i] / 255.
        
#     scaler = StandardScaler()
#     x_train = scaler.fit_transform(x_train)
#     x_test = scaler.transform(x_test)
    
    #np.savez("feature_fusion"+str(i)+".npz", x_train=x_train, y_train=y_train, x_test=x_test, y_test=y_test)
        
#     if mode == "classification":
#         if binary:
#             plt.plot(np.arange(x_train.shape[1]), x_train[y_train==0].mean(axis=0), label='non-anemia')
#             plt.fill_between(np.arange(x_train.shape[1]), x_train[y_train==0].mean(axis=0)-x_train[y_train==0].std(axis=0), x_train[y_train==0].mean(axis=0)+x_train[y_train==0].std(axis=0), alpha=0.4)
#             plt.plot(np.arange(x_train.shape[1]), x_train[y_train==1].mean(axis=0), label='anemia')
#             plt.fill_between(np.arange(x_train.shape[1]), x_train[y_train==1].mean(axis=0)-x_train[y_train==1].std(axis=0), x_train[y_train==1].mean(axis=0)+x_train[y_train==1].std(axis=0), alpha=0.4)
#     plt.title("Fold " + str(i) + " (Train)")
#     plt.xlabel("Eyelid (GBRHSV), Nail (GBRHSV)")
#     plt.ylabel("Value")
#     plt.legend()
#     plt.show()
    
#     if mode == "classification":
#         if binary:
#             plt.plot(np.arange(x_test.shape[1]), x_test[y_test==0].mean(axis=0), label='non-anemia')
#             plt.fill_between(np.arange(x_test.shape[1]), x_test[y_test==0].mean(axis=0)-x_test[y_test==0].std(axis=0), x_test[y_test==0].mean(axis=0)+x_test[y_test==0].std(axis=0), alpha=0.4)
#             plt.plot(np.arange(x_test.shape[1]), x_test[y_test==1].mean(axis=0), label='anemia')
#             plt.fill_between(np.arange(x_test.shape[1]), x_test[y_test==1].mean(axis=0)-x_test[y_test==1].std(axis=0), x_test[y_test==1].mean(axis=0)+x_test[y_test==1].std(axis=0), alpha=0.4)
#     plt.title("Fold " + str(i) + " (Test)")
#     plt.xlabel("Eyelid (GBRHSV), Nail (GBRHSV)")
#     plt.ylabel("Value")
#     plt.legend()
#     plt.show()
    
    prediction_csv, prediction_results = evaluate_one_round(x_train, y_train, x_test, y_test, False)
    
    kfold_prediction_results.append(prediction_results)

In [None]:
results = {}
for i in kfold_prediction_results:
    for key, value in i.items():
        if not results.get(key):
            results[key] = []
        results[key].append(value["accuracy"])
        
for method, accuracy in results.items():
    print(method, ": ", np.mean(accuracy), np.std(accuracy))