In [None]:
import cv2
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import matplotlib
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from scipy.special import digamma
from sklearn.neighbors import KDTree
from numpy.polynomial.polynomial import polyfit
from scipy import stats
from math import sqrt

In [None]:
def get_radius_kneighbors(x, n_neighbors):
    kd = KDTree(x, metric="chebyshev")
    neigh_dist = kd.query(x, k=n_neighbors+1)[0]
    
    return np.nextafter(neigh_dist[:, -1], 0)


def num_points_within_radius(x, radius):
    kd = KDTree(x, metric="chebyshev")
    nx = kd.query_radius(x, radius, count_only=True, return_distance=False)
    
    return np.array(nx) - 1.0


def preprocess_data(x):
    x = np.array(x, dtype=np.float64)
    if x.ndim == 1:
        x = x.reshape(-1, 1)
    elif x.ndim != 2:
        raise ValueError(f'x.ndim = {x.ndim}, should be 1 or 2')

    means = np.maximum(1e-100, np.mean(np.abs(x), axis=0))

    return (1/means) * x


def compute_mi(x, y, n_neighbors=5):
    # Kraskov
    n_samples = len(x)
    x, y = [preprocess_data(t) for t in [x, y]]
    xy = np.hstack((x, y))
    k = np.full(n_samples, n_neighbors)
    radius = get_radius_kneighbors(xy, n_neighbors)

    mask = (radius == 0)
    if mask.sum() > 0:
        vals, ix, counts = np.unique(
            xy[mask], axis=0, return_inverse=True, return_counts=True
        )
        k[mask] = counts[ix] - 1

    nx = num_points_within_radius(x, radius)
    ny = num_points_within_radius(y, radius)

    mi = max(0, digamma(n_samples) + np.mean(digamma(k))
             - np.mean(digamma(nx + 1)) - np.mean(digamma(ny + 1)))
    return mi

def greedy(x, y, n_neighbors=5):
    idx = []
    rem = np.arange(0, x.shape[1])
    score = 0
    j = -1
    while len(rem)>0:
        mi = np.array([compute_mi(x[:, idx+[i]], y, n_neighbors) for i in rem])
        j = rem[np.argmax(mi)]
        mi = np.max(mi)
        if mi > score:
            score = mi
            rem = np.delete(rem, j)
            idx.append(j)
        else:
            break
        j = -1
    return idx

In [None]:
survey = pd.read_csv('../datasets/FinUI/100_avg_scores.csv')
survey = survey.set_index(np.arange(1, 101))

img_features = pd.read_csv('../datasets/FinUI/100_img_features.csv')
img_features = img_features.set_index(np.arange(1, 101))

txt_features = pd.read_csv('../datasets/FinUI/100_txt_features.csv')
txt_features = txt_features.set_index(np.arange(1, 101))

ae_features = pd.read_csv('../datasets/FinUI/100_ae_features.csv')
ae_features = ae_features.set_index(np.arange(1, 101))

aeb_features = pd.read_csv('../datasets/FinUI/100_aeb_features.csv')
aeb_features = aeb_features.set_index(np.arange(1, 101))

features = ['img_' + f for f  in img_features.columns.to_list()] \
+ ['txt_' + f for f  in txt_features.columns.to_list()] \
+ ['ae_' + f for f  in ae_features.columns.to_list()]

questions = survey.columns.to_list()

In [None]:
X = np.append(
    np.append(
        img_features.fillna(0).values, 
        txt_features.fillna(0).values, 
        axis=1
    ),
    ae_features.fillna(0).values, 
    axis=1
)
y = survey.values

X_train, y_train = X[:70, :], y[:70, :]
X_test, y_test = X[70:, :], y[70:, :]

# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size = 0.3, random_state = 2
# )

scaler = StandardScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
X = scaler.transform(X)

In [None]:
th = np.arange(0.08, 0.23, 0.01)
results = {}
for question in questions:
    mi = []
    ft = features
    if question=='2_1':
        ft = features[20:35]
    if question=='3_2':
        ft = features[:20]
    for f in ft:
        mi.append(compute_mi(X_train[:, features.index(f)], y_train[:, questions.index(question)], 5))
    res_loc = []
    for thresh in th:
        idx = np.array(mi)>thresh
        if all(idx==False):
            res_loc.append(-np.inf)
            continue
        if question=='2_1':
            idx = np.arange(20,35)[idx]
        if question=='3_2':
            idx = np.arange(20)[idx]
        reg = LinearRegression().fit(X_train[:,idx], y_train[:, questions.index(question)])
        cc = np.corrcoef(reg.predict(X_test[:,idx]), y_test[:, questions.index(question)])[0, 1]
        res_loc.append(cc)
    results[question] = res_loc

best = pd.DataFrame({'q': questions, 'cc': [max(results[r]) for r in results], 'thresh': [th[np.argmax(results[r])] for r in results]})

In [None]:
selected_features = np.zeros([len(features), len(questions)])
for i, row in best.iterrows():
    mi = []
    ft = features
    if questions[i]=='2_1':
        ft = features[20:35]
    if questions[i]=='3_2':
        ft = features[:20]
    for f in ft:
        mi.append(compute_mi(X_train[:, features.index(f)], y_train[:, i], 5))
        
    idx = np.array(mi)>row['thresh']
    if questions[i]=='2_1':
        idx = np.arange(20,35)[idx]
    if questions[i]=='3_2':
        idx = np.arange(20)[idx]
        
    reg = LinearRegression().fit(X_train[:,idx], y_train[:, i])
    
    selected_features[idx, i] = np.abs(reg.coef_)

In [None]:
th = np.arange(0., 0.23, 0.01)
ft = features
mi = []
y_total_train = y_train.mean(axis=1)
y_total_test = y_test.mean(axis=1)
for f in ft:
    mi.append(compute_mi(X_train[:, features.index(f)], y_total_train, 3))
res = []
for thresh in th:
    idx = np.array(mi)>thresh
    if all(idx==False):
        res.append(-np.inf)
        continue
    reg = LinearRegression().fit(X_train[:,idx], y_total_train)
    cc = np.corrcoef(reg.predict(X_test[:,idx]), y_total_test)[0, 1]
    res.append(cc)

In [None]:
idx = np.array(mi)>th[np.argmax(res)]
y_total_train = y_train.mean(axis=1)
y_total_test = y_test.mean(axis=1)
reg = LinearRegression().fit(X_train[:,idx], y_total_train)
selected_features_ = np.zeros([len(features), len(questions)+1])
selected_features_[:, :-1] = selected_features
selected_features_[idx, -1] = np.abs(reg.coef_)

In [None]:
fig = plt.figure(figsize=(12, 10))

ax = sns.heatmap(
    selected_features_, 
    xticklabels=questions + ['Total'],
    yticklabels=features,
    linecolor='#ededed',
    linewidths=0.1,
    cmap='Blues', vmin=0, vmax=1
)
ax.tick_params(axis='y', colors='black')
ax.set_xlabel('Questions', labelpad=10, fontsize=16)
plt.title('LR Feature Importance', pad=10, fontsize=18)
plt.show()

In [None]:
prediction = reg.predict(X[:,idx])
z = y.mean(axis=1)
b, m = polyfit(z, prediction, 1)

fig, ax = plt.subplots()
ax.scatter(z, prediction)
ax.plot(z, b + m * z, '-', c='b')
ax.set_xlabel('Expert Scores', labelpad=10, fontsize=14)
ax.set_ylabel('Score Predictions', labelpad=10, fontsize=14)
plt.title(f'Total score', pad=10, fontsize=14)

In [None]:
select_idx = 88

img_dir = 'datasets/FinUI/images'
images = os.listdir(img_dir)
images = [img for img in images if img.split('.')[1] == 'png']

fig = plt.figure(figsize=(14,6))

ax1 = fig.add_subplot(121)

img = cv2.imread(os.path.join(img_dir, images[select_idx]))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.copyMakeBorder(img, 2, 2, 2, 2, cv2.BORDER_CONSTANT, value=(99,102,106))

ax1.imshow(img)
ax1.set_axis_off()

ax = fig.add_subplot(122, projection='polar')

s = []
for question in questions:
    thresh = best.loc[best.q==question, 'thresh'].iloc[0]
    mi = []
    ft = features
    if question=='2_1':
        ft = features[20:35]
    if question=='3_2':
        ft = features[:20]
    for f in ft:
        mi.append(compute_mi(X_train[:, features.index(f)], y_train[:, questions.index(question)], 5))
    idx = np.array(mi)>thresh
    if question=='2_1':
         idx = np.arange(20,35)[idx]
    if question=='3_2':
        idx = np.arange(20)[idx]
    reg = LinearRegression().fit(X_train[:,idx], y_train[:, questions.index(question)])
    s.append(reg.predict(X[:,idx])[select_idx])
    
predict = prediction[select_idx]
expert = survey.values.mean(axis=1)[select_idx]

s = np.array(s)
s = predict*s/np.average(s)

values = list(s)
values += values[:1]

n = len(questions)
angles = [i / float(n) * 2 * np.pi for i in range(n)]
angles += angles[:1]

ax.plot(angles, values, linewidth=1, linestyle='solid', label=f'prediction: {predict:.2f}')
ax.fill(angles, values, 'b', alpha=0.1)

values=list(survey.iloc[select_idx])
values += values[:1]
ax.plot(angles, values, linewidth=1, linestyle='solid', label=f'expert: {expert:.2f}')
ax.fill(angles, values, 'r', alpha=0.1)


ax.set_theta_offset(np.pi / 2)
ax.set_theta_direction(-1)
ax.set_xticks(angles[:-1], questions)
ax.set_rlabel_position(0)
ax.set_ylim([0, 5])
ax.set_yticks([1, 2, 3, 4], ["1", "2","3", "4"], color="grey", size=7)
plt.legend(loc=5, shadow=True)

plt.show()