In [3]:
from sklearn.base import BaseEstimator
from sklearn.base import ClassifierMixin
from sklearn.metrics import accuracy_score
from sklearn.base import clone
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import _name_estimators
import numpy as np
import operator
class MajorityVoteClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, classifiers, vote='classlabel', weights=None):
        self.classifiers = classifiers
        self.named_classifiers = {key: value for key, value in _name_estimators(classifiers)}
        self.vote = vote
        self.weights = weights
        
    def fit(self, X, y):
        if self.vote not in ('probability', 'classlabel'):
            raise ValueError("vote must be 'probability' or 'classlabel' ; got (vote=%r)" % self.vote)
        if self.weights and len(self.weights) != len(self.classifiers):
            raise ValueError('Number of classifiers and weights must be equal'
                             '; got %d weights, %d classifiers' % (len(self.weights), len(self.classifiers)))
    
        self.classifiers_ = []
        for clf in self.classifiers:
            binary_labels = (y == clf['label']).astype(int)
            fitted_clf = clone(clf['model']).fit(X, binary_labels)
            self.classifiers_.append({'model':fitted_clf, 'label': clf['label']})
        return self
    

    def predict(self, X):
        # print(X[0])
        # Using decision function for class predictions
        predictions = np.asarray([clf['model'].predict(X) for clf in self.classifiers_]).T
        maj_vote = np.apply_along_axis(lambda x: np.argmax(np.bincount(x, weights=self.weights)), axis=1, arr=predictions)
        print(predictions)
        print(maj_vote)
        return maj_vote
        print("Sample Predictions:", predictions)

        # Perform majority voting
        maj_vote.append(np.bincount(predictions).argmax())
        return maj_vote
    
    def evaluate(self, X_test, Y_test):
        y_predict = self.predict(X_test)
        return accuracy_score(Y_test, y_predict)


In [None]:
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import cross_val_score, StratifiedKFold, KFold
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
import numpy as np
from sklearn.metrics import accuracy_score
import numpy.core.defchararray as np_f
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
from skopt import BayesSearchCV
from sklearn.multiclass import OneVsRestClassifier
from sklearn.model_selection import train_test_split


def mix_aug_data(X_train, Y_train, AUG_NAME_MODIFIER):
    # print("\nMIXING AUGMENT\n")
    image_paths = X_train.flatten() 
    augmented_image_paths =    augmented_image_paths = np.array([path.replace('.jpg', AUG_NAME_MODIFIER) for path in image_paths])
    augmented_image_paths = augmented_image_paths.reshape(X_train.shape)
    result_X_train = np.concatenate((augmented_image_paths, X_train), axis=0)
    result_Y_train = np.concatenate((Y_train, Y_train), axis=0)
    return result_X_train, result_Y_train


def get_gray_scales(X_train, AUG_NAME_MODIFIER='_grayscale.jpg'):
    # print("\nMIXING AUGMENT\n")
    image_paths = X_train.flatten() 
    augmented_image_paths =    augmented_image_paths = np.array([path.replace('.jpg', AUG_NAME_MODIFIER) for path in image_paths])
    augmented_image_paths = augmented_image_paths.reshape(X_train.shape)
    return augmented_image_paths

def load_dict_HOG():
    df = pd.read_csv('./hog_features_grayscale.csv')
    xs = np.array(df.iloc[:, :1])
    ys = np.array(df.iloc[:, 1:])
    return make_HOG_dict(xs,ys)

def load_dict_HIST():
    df = pd.read_csv('./histogram_features_color.csv')
    xs = np.array(df.iloc[:, :1])
    ys = np.array(df.iloc[:, 1:])
    return make_HIST_dict(xs,ys)

def make_HOG_dict(xs,ys):
    HOG = {}
    for i in range(len(xs)):
        HOG[xs[i][0]] = ys[i]
    print("loaded HOG")
    return HOG


def make_HIST_dict(xs,ys):
    HIST = {}
    for i in range(len(xs)):
        HIST[xs[i][0]] = ys[i]
    print("loaded HIST")
    return HIST



def train_and_evaluate(X_train, Y_train, X_eval, Y_eval, model):
    """Train the KNN model and evaluate it on the validation set."""
    model.fit(X_train, Y_train)
    y_pred_validation = model.predict(X_eval)
    return accuracy_score(Y_eval, y_pred_validation)

def gridSearch(X_train, Y_train, X_eval, Y_eval, param_combinations):
    #grid search
    best_score = 0
    best_params = None
    # print(Y_eval)
    # print(binary_labels_eval)

    pca = PCA(n_components=0.95)
    pca.fit(X_train)
    X_train = pca.transform(X_train)
    X_eval = pca.transform(X_eval)
    
    # Iterate over each parameter combination
    for params in param_combinations:
        # Create a new KNN model with the current parameters
        model = OneVsRestClassifier(SVC(**params, cache_size=10000))
        # Train the model
        model.fit(X_train, Y_train)
        # Validate the model
        y_pred_eval = model.predict(X_eval)
    
        validation_accuracy = accuracy_score(Y_eval, y_pred_eval)

        # Check if this is the best score
        if validation_accuracy > best_score:
            best_score = validation_accuracy
            best_params = params

    return best_score, best_params


def get_data_dict(X):

    print("\nFETCHING IMAGES FROM DIRECTORY\n")

    image_paths = [f"train_new_ims/{img[0]}" for img in X]
    img_dict = {}
    total_images = len(image_paths)

    def progress_callback(future):
        """Callback function to update progress."""
        nonlocal loaded_images
        loaded_images += 1
        percentage = (loaded_images / total_images) * 100
        print('\r\033[K', end='')
        print(f"\rProgress: {percentage:.2f}%", end='')

    loaded_images = 0

    # Use ThreadPoolExecutor to load images in parallel
    with ThreadPoolExecutor() as executor:
        # Submit tasks to load images
        futures = {executor.submit(load_image, path): path for path in image_paths}
        
        # Attach a callback to each future to update progress
        for future in futures:
            future.add_done_callback(progress_callback)

        # Collect results
        for future in as_completed(futures):
            img_path = futures[future]
            img_dict[img_path.split('/')[-1]] = future.result()  # Store the result in the dictionary
        
    print(len(list(img_dict.keys())))
    return img_dict


def load_image(path):
    img_ = tf.keras.preprocessing.image.load_img(path)  # Load image
    img_array = tf.keras.preprocessing.image.img_to_array(img_)  # Convert to array
    mean = np.mean(img_array)
    std_dev = np.std(img_array)
    img_array = (img_array - mean) / std_dev
    return img_array.flatten()




def get_data_from_dict(X_image_paths, all_images):

    # print("\nFETCHING IMAGES FROM DICTIONARY\n")
    # Extract the image keys (filenames) from X_image_paths
    img_keys = [img for img in X_image_paths.flatten()]
    # Fetch the corresponding pixel data from the dictionary
    img_arrays = [all_images[key].flatten() for key in img_keys if key in all_images]
    return np.array(img_arrays)






def baselineTest(xs, ys):

    # xs = xs[:5000]
    # ys = ys[:5000]

    X_train, X_test, y_train, y_test = train_test_split(xs, ys, test_size=0.1, random_state=42)
    all_images = get_data_dict(xs)

    # # Example of adjusting PCA components
    # pca = PCA(n_components=0.95)  # Try varying this value
    # pca.fit(X_train_pixel)
    # X_train_pixel = pca.transform(X_train_pixel)
    # X_test_pixel = pca.transform(X_test_pixel)

    # # Example of using class weights in SVC
    # model = OneVsRestClassifier(SVC(C=10, class_weight='balanced'))
    # model.fit(X_train_pixel, y_train)
    # y_pred = model.predict(X_test_pixel)
    # baseline_score = accuracy_score(y_test, y_pred)
    # print(f"Baseline Model Accuracy: {baseline_score:.4f}")



    # X_train_pixel = get_data_from_dict(X_train, all_images)
    # X_test_pixel = get_data_from_dict(X_test, all_images)
    all_hogs = load_dict_HOG()
    X_train_GS = get_gray_scales(X_train)
    X_test_GS = get_gray_scales(X_test)
    X_train_HOG = get_data_from_dict(X_train_GS, all_hogs)
    X_test_HOG = get_data_from_dict(X_test_GS, all_hogs)

    all_hists = load_dict_HIST()
    X_train_HIST = get_data_from_dict(X_train, all_hists)
    X_test_HIST = get_data_from_dict(X_test, all_hists)

    X_train_HH = np.hstack((X_train_HOG, X_train_HIST))
    X_test_HH = np.hstack((X_test_HOG, X_test_HIST))

    pca = PCA(n_components=0.9)  # Try varying this value
    pca.fit(X_train_HH)
    X_train_HH = pca.transform(X_train_HH)
    X_test_HH = pca.transform(X_test_HH)

    model = OneVsRestClassifier(SVC(C=10, class_weight='balanced' ,verbose=True))
    model.fit(X_train_HH, y_train)
    y_pred_HIST_HOG = model.predict(X_test_HH)
    baseline_score = accuracy_score(y_test, y_pred_HIST_HOG)

    print(f"Baseline Model Accuracy: {baseline_score:.4f}")




def complexSol(xs,ys):
    xs = xs[:1000]
    ys = ys[:1000]

    outer_cv = StratifiedKFold(n_splits=2, shuffle=True, random_state=42)
    inner_cv = StratifiedKFold(n_splits=2, shuffle=True, random_state=22)
    all_hogs = load_dict_HOG()
    all_hists = load_dict_HIST()
    all_image_paths, throwAways = mix_aug_data(xs,ys, "_augmented.jpg")
    print(f"\n\nTOTAL SAMPLES: {len(all_image_paths)}")
    all_images = get_data_dict(all_image_paths)

    param_grid = {
        'C': [0.01, 0.1, 0.2, 1, 10, 100],  # Regularization parameter
        'kernel': ['rbf'],   # SVM kernel types
        'gamma': ['auto']      # Kernel coefficient
    }

    # Create a list of parameter combinations
    param_combinations = [
        
        {'C': c, 'kernel': k, 'gamma': g}
        for c in param_grid['C']
        for k in param_grid['kernel']
        for g in param_grid['gamma']
    ]

    total_accuracy = 0
    number_of_test_rounds = 0
    ######### NESTED K FOLD ##############
    ######### TRAINING + TEST FOLDS ######
    for i, (train_index, test_index) in enumerate(outer_cv.split(xs,ys)):
        #not pixels at this point, just image names.
        X_train, X_test = xs[train_index], xs[test_index]
        Y_train, Y_test = ys[train_index], ys[test_index]
        total_validation_accuracy_pixel=0
        total_validation_accuracy_hist_hog=0
        number_of_validation_rounds = 0

        print(f"OUTER SPLIT: training[{len(X_train)}]\ttesting[{len(X_test)}]\ttotal[{len(X_train) + len(X_test)}]")

        best_across_validation = [{'best_score':None, 'best_params':None}, {'best_score':None, 'best_params':None}]

        ######### TRAINING + VALIDATION FOLDS ######
        for j, (inner_train_index, inner_val_index) in enumerate(inner_cv.split(X_train,Y_train)):
            X_inner_train, X_inner_val = X_train[inner_train_index], X_train[inner_val_index]
            y_inner_train, y_inner_val = Y_train[inner_train_index], Y_train[inner_val_index]
            print(f"\tINNER SPLIT: training[{len(X_inner_train)}]\tvalidation[{len(X_inner_val)}]\ttotal[{len(X_inner_train) + len(X_inner_val)}]")
            X_inner_train, y_inner_train = mix_aug_data(X_inner_train, y_inner_train, "_augmented.jpg")

            #####PIXEL TRAIN#####
            X_inner_train_pixel = get_data_from_dict(X_inner_train, all_images)
            X_inner_val_pixel = get_data_from_dict(X_inner_val, all_images)

            best_score_pixel, best_params_pixel = gridSearch(X_inner_train_pixel, y_inner_train, X_inner_val_pixel, y_inner_val, param_combinations)
            print(f"PIXEL\tOuter Fold {i + 1}, Inner Fold {j + 1}, Best Accuracy: {best_score_pixel:.4f}, Best params: {best_params_pixel}")
            if (best_across_validation[0]['best_score'] == None or best_score_pixel > best_across_validation[0]['best_score']):
                best_across_validation[0]['best_score'] = best_score_pixel
                best_across_validation[0]['best_params'] = best_params_pixel


            #####HIST HOG TRAIN#####
            X_inner_train_hist = get_data_from_dict(X_inner_train, all_hists)
            X_inner_val_hist = get_data_from_dict(X_inner_val, all_hists)

            X_inner_train_GS = get_gray_scales(X_inner_train)
            X_inner_val_GS = get_gray_scales(X_inner_val)
            X_inner_train_hog = get_data_from_dict(X_inner_train_GS, all_hogs)
            X_inner_val_hog = get_data_from_dict(X_inner_val_GS, all_hogs)

            X_inner_train_HH = np.concatenate(X_inner_train_hist, X_inner_train_hog)
            X_inner_val_HH = np.concatenate(X_inner_val_hist, X_inner_val_hog)

            pca = PCA(n_components=0.90)  # Try varying this value
            pca.fit(X_inner_train_HH)
            X_inner_train_HH = pca.transform(X_inner_train_HH)
            X_inner_val_HH = pca.transform(X_inner_val_HH)

            best_score_hist_hog, best_params_hist_hog = gridSearch(X_inner_train_HH, y_inner_train, X_inner_val_HH, y_inner_val, param_combinations)

            print(f"HIST HOG\tOuter Fold {i + 1}, Inner Fold {j + 1}, Best Accuracy: {best_score_hist_hog:.4f}, Best params: {best_params_hist_hog}")
            if (best_across_validation[1]['best_score'] == None or best_score_hist_hog > best_across_validation[1]['best_score']):
                best_across_validation[1]['best_score'] = best_score_hist_hog
                best_across_validation[1]['best_params'] = best_params_hist_hog




            # #####HOG TRAIN#####
            # X_inner_train_hog = get_data_from_dict(X_inner_train_GS, all_hogs)
            # X_inner_val_hog = get_data_from_dict(X_inner_val_GS, all_hogs)
            # best_score_hog, best_params_hog = gridSearch(X_inner_train_hog, y_inner_train, X_inner_val_hog, y_inner_val, param_combinations)
            # print(f"HOG\tOuter Fold {i + 1}, Inner Fold {j + 1}, Best Accuracy: {best_score_hog:.4f}, Best params: {best_params_hog}")
            # if (best_across_validation[2]['best_score'] == None or best_score_hog > best_across_validation[2]['best_score']):
            #     best_across_validation[2]['best_score'] = best_score_hog
            #     best_across_validation[2]['best_params'] = best_params_hog

            total_validation_accuracy_pixel += best_score_pixel
            total_validation_accuracy_hist_hog += best_score_hist_hog
            # total_validation_accuracy_hog += best_score_hog
            number_of_validation_rounds += 1
        
        average_validation_acc_pixel = total_validation_accuracy_pixel/number_of_validation_rounds
        average_validation_acc_hist_hog = total_validation_accuracy_hist_hog/number_of_validation_rounds
        # average_validation_acc_hog = total_validation_accuracy_hog/number_of_validation_rounds
        print(f"PIXEL\tAverage Validation Accuracy: {average_validation_acc_pixel:.4f}")
        print(f"HIST_HOG\tAverage Validation Accuracy: {average_validation_acc_hist_hog:.4f}")
        # print(f"HOG\tAverage Validation Accuracy: {average_validation_acc_hog:.4f}")

        ensemble = []
        for classifier in best_across_validation:
            ensemble.append(OneVsRestClassifier(SVC(**classifier['best_params'], cache_size=10000)))

        X_train, Y_train = mix_aug_data(X_train, Y_train, "_augmented.jpg")
        X_train_GS = get_gray_scales(X_train)
        X_test_GS = get_gray_scales(Y_train)

        #####PIXEL TRAIN#####
        X_train_pixel = get_data_from_dict(X_train, all_images)
        X_test_pixel = get_data_from_dict(X_test,all_images)
        ensemble[0].fit(X_train_pixel, Y_train)
        y_pred_pixel = ensemble[0].predict(X_test_pixel)

        #####HIST HOG TRAIN ####
        X_train_hist = get_data_from_dict(X_train, all_hists)
        X_test_hist = get_data_from_dict(X_test,all_hists)
        X_train_HOG = get_data_from_dict(X_train_GS, all_hogs)
        X_test_HOF = get_data_from_dict(X_test_GS, all_hogs)
        X_train_HH = np.concatenate((X_train_hist, X_train_HOG))
        X_test_HH = np.concatenate((X_test_hist, X_test_HOF))

        
        pca = PCA(n_components=0.90)  # Try varying this value
        pca.fit(X_train_HH)
        X_train_HH = pca.transform(X_train_HH)
        X_test_HH = pca.transform(X_test_HH)


        ensemble[1].fit(X_train_HH, Y_train)
        y_pred_hist_hog = ensemble[0].predict(X_test_HH)


        # Assuming y_pred_pixel, y_pred_hist, and y_pred_HOG are your predictions from the models
        num_classes = 10  # Adjust this based on your specific use case
        num_samples = len(y_pred_pixel)  # Number of samples to predict

        # Initialize a vote array
        vote_array = np.zeros((num_samples, num_classes), dtype=int)

        # Accumulate votes for each prediction
        vote_array[np.arange(num_samples), y_pred_pixel] += 1
        vote_array[np.arange(num_samples), y_pred_hist_hog] += 1

        # Determine final predictions by taking the class with the most votes
        final_predictions = np.argmax(vote_array, axis=1)
        print(final_predictions[:10])
        test_accuracy = accuracy_score(Y_test, final_predictions)
        print(f"ENSEMBLE Outer Fold {i+1} Accuracy: {test_accuracy:.4f}\n\n")
        total_accuracy += test_accuracy
        number_of_test_rounds +=1

    average_test_acc = total_accuracy/number_of_test_rounds
    print(f"ENSEMBLE Average Test Accuracy: {average_test_acc:.4f}") 





df = pd.read_csv('./train.csv')
xs = np.array(df.iloc[:, :-1])
ys = np.array(df.iloc[:, -1])

baselineTest(xs,ys)

