In [1]:
import os
import pickle

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import cv2
from scipy import stats
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.feature_selection import RFE

In [2]:
def find(s, ch):
    return [i for i, ltr in enumerate(s) if ltr == ch]

In [3]:
data_df = pd.DataFrame()

compression_levels = ["low", "medium", "high"]
train_validate_list = ["train", "validate"]

for level_i, compression_level in enumerate(compression_levels):
    for train_validate in train_validate_list:
        path = os.path.join(compression_level, train_validate)
        for csv_name in os.listdir(path):
            file_path = os.path.join(path, csv_name)
            df = pd.read_csv(file_path, header=None, skiprows=1)
            df = df.drop(df.columns[0], axis=1)
            df = df.to_numpy()

            # Uncomment this code to display the images
            # df = df.T
            # df = df[::-1]
            # plt.imshow(df)
            # plt.show()

            # df_gray = cv2.cvtColor(df, cv2.COLOR_BGR2GRAY)
            df_blur = cv2.GaussianBlur(df, (3,3), 0)

            edges = cv2.Canny(image=np.uint8(df_blur), threshold1=100, threshold2=200) # Canny Edge Detection

            # edges = 255 - edges
            edges[edges > 255] = 1

            # plt.imshow(edges, cmap=plt.get_cmap('gray'))
            # plt.show()

            edges_count = np.sum(np.sum(edges))

            df_flattened = df.flatten()

            # Uncomment this code to display the histogram of the colors of the images
            # plt.hist(df)
            # plt.show()

            mean = df_flattened.mean()
            sd = df_flattened.std()
            max_val = df_flattened.max()
            min_val = df_flattened.min()
            median = np.median(df_flattened)
            trimmed_mean = stats.trim_mean(df_flattened, 0.2)
            skew = stats.skew(df_flattened)
            range_vals = max_val - min_val
            avg_val = np.sum(df_flattened)

            # Find if poisson
            spread = max_val - min_val
            if max_val - mean < 0.25 * spread:
                is_poisson = True
            else:
                is_poisson = False

            # Left and right stats
            # Left
            middle_val = (max_val - min_val) / 2 + min_val
            left = df_flattened[df_flattened < middle_val]
            left_mean = left.mean()
            left_sd = left.std()
            left_median = np.median(left)
            left_skew = stats.skew(left)
            # Right
            right = df_flattened[df_flattened >= middle_val]
            right_mean = right.mean()
            right_sd = right.std()
            right_median = np.median(right)
            right_skew = stats.skew(right)

            new_row = {"Compression": level_i,
                       "Train_validate": train_validate,
                       # Features
                       "Edges": edges_count,
                       "Mean": mean,
                       "Trimmed_mean": trimmed_mean,
                       "Median": median,
                       "Skew": skew,
                       "Is_poisson": is_poisson,
                       "SD": sd,
                       "Max": max_val,
                       "Min": min_val,
                       "Range": range_vals,
                       "Avg": avg_val,
                       "Left_mean": left_mean,
                       "Left_SD": left_sd,
                       "Left_median": left_median,
                       "Left_skew": left_skew,
                       "Right_mean": right_mean,
                       "Right_SD": right_sd,
                       "Right_median": right_median,
                       "Right_skew": right_skew
            }

            data_df = data_df.append(new_row, ignore_index=True)

In [4]:
def clean_data(data_df, train_validate):
    X = data_df[(data_df["Train_validate"] == train_validate)]
    X = X.drop(columns=["Compression", "Train_validate"])

    y = data_df[(data_df["Train_validate"] == train_validate)]
    y = y["Compression"]

    return X, y

In [5]:
X_train, y_train = clean_data(data_df, "train")
X_test, y_test = clean_data(data_df, "validate")

clf = RandomForestClassifier(random_state=0).fit(X_train, y_train)

print(f"Train Accuracy: {round(clf.score(X_train, y_train), 3)}")
print(f"Test Accuracy: {round(clf.score(X_test, y_test), 3)}")

print("\nFeature importances:")
zipped_lists = zip(clf.feature_importances_, X_train.columns)
sorted_pairs = sorted(zipped_lists, reverse=True)
for pair in sorted_pairs:
    print(f"{round(pair[0], 2)} {pair[1]}")

Train Accuracy: 1.0
Test Accuracy: 0.789

Feature importances:
0.21 Min
0.11 Left_median
0.09 Median
0.08 Avg
0.07 Right_skew
0.06 Left_mean
0.05 Skew
0.05 Trimmed_mean
0.04 Mean
0.04 Right_median
0.03 Right_mean
0.03 Left_skew
0.03 Max
0.02 Edges
0.02 SD
0.02 Range
0.02 Right_SD
0.02 Left_SD
0.0 Is_poisson


In [6]:
def feature_selection(X_train, X_test, y_train, y_test):
    sfs = SequentialFeatureSelector
    rfe = RFE
    feature_selectors = [sfs, rfe]

    models = [RandomForestClassifier]

    best_accuracy = 0
    best_model = None
    best_selector = None
    for model in models:
        for selector_method in feature_selectors:
            for n_features in range(1, X_train.shape[1]):
                clf = model(random_state=0)
                selector = selector_method(clf, n_features_to_select=n_features)
                selector.fit(X_train, y_train)
                X_train_transformed = selector.transform(X_train)
                X_test_transformed = selector.transform(X_test)

                clf.fit(X_train_transformed, y_train)
                test_accuracy = clf.score(X_test_transformed, y_test)

                if test_accuracy > best_accuracy:
                    best_accuracy = test_accuracy
                    best_model = clf
                    best_selector = selector

    with open(os.path.join("pkls", "best_model.pkl"), 'wb') as handle:
        pickle.dump(best_model, handle, protocol=pickle.HIGHEST_PROTOCOL)

    with open(os.path.join("pkls", "best_selector.pkl"), 'wb') as handle:
        pickle.dump(best_selector, handle, protocol=pickle.HIGHEST_PROTOCOL)

    return best_model, best_selector

In [18]:
def get_best_model(X_train, X_test, y_train, y_test, retrain=False):
    if retrain:
        return feature_selection(X_train, X_test, y_train, y_test)
    
    with open(os.path.join("pkls", "best_model.pkl"), 'rb') as handle:
        best_model = pickle.load(handle)

    with open(os.path.join("pkls", "best_selector.pkl"), 'rb') as handle:
        best_selector = pickle.load(handle)

    return best_model, best_selector

In [19]:
best_model, best_selector = get_best_model(X_train, X_test, y_train, y_test, retrain=False)

X_train_transformed = best_selector.transform(X_train)
X_test_transformed = best_selector.transform(X_test)

train_accuracy = best_model.score(X_train_transformed, y_train)
test_accuracy = best_model.score(X_test_transformed, y_test)
features = best_selector.get_feature_names_out()

print(f"Train accuracy: {train_accuracy}")
print(f"Test accuracy: {test_accuracy}")

print("Feature importance:")
zipped_lists = zip(best_model.feature_importances_, features)
sorted_pairs = sorted(zipped_lists, reverse=True)
for pair in sorted_pairs:
    print(f"{round(pair[0], 2)} {pair[1]}")

Train accuracy: 0.995774647887324
Test accuracy: 0.8555555555555555
Feature importance:
0.15 Left_median
0.13 Median
0.12 Avg
0.11 Right_skew
0.09 Mean
0.09 Trimmed_mean
0.07 Right_mean
0.07 SD
0.07 Left_SD
0.06 Right_median
0.05 Edges


In [16]:
# Save features as csv
high_train = data_df[(data_df["Compression"] == 2.0) & (data_df["Train_validate"] == "train")]
med_train = data_df[(data_df["Compression"] == 1.0) & (data_df["Train_validate"] == "train")]
low_train = data_df[(data_df["Compression"] == 0.0) & (data_df["Train_validate"] == "train")]
high_validate = data_df[(data_df["Compression"] == 2.0) & (data_df["Train_validate"] == "validate")]
med_validate = data_df[(data_df["Compression"] == 1.0) & (data_df["Train_validate"] == "validate")]
low_validate = data_df[(data_df["Compression"] == 0.0) & (data_df["Train_validate"] == "validate")]

combined_df_list = [low_validate, med_train, low_train, high_validate, med_validate, low_validate]
combined_df = pd.concat(combined_df_list)

combined_df['image_id'] = range(len(combined_df))

combined_df.to_csv("descriptive_stats.csv", index=False)