In [1]:
import pickle
import numpy as np
from pathlib import Path
import tensorflow.keras as keras

from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

from predictive_model import le # Import label encoder

class FeedbackSystem:
    def __init__(self):
        self.model = None
        self.pca = None
        self.data = []
        self.labels = []
        self.load_initial_model()

    def load_initial_model(self):
        # Load pre-trained PCA and KMeans
        with open('../data/features/VGG16_fcl_features_std.pickle', 'rb') as f:
            features = pickle.load(f)['features']
        self.pca = PCA(n_components=35, whiten=True, svd_solver='full')
        self.pca.fit(features)  # Use loaded features instead of undefined 'fci'
        self.model = KMeans(n_clusters=7, init='k-means++', n_init=500)
        self.model.fit(self.pca.transform(features))  # Use the same features

    def process_feedback(self, image_features):
        # Predict
        reduced_features = self.pca.transform([image_features])
        cluster = self.model.predict(reduced_features)[0]
        predicted_label = le.inverse_transform([cluster])[0]

        # Get feedback
        is_correct = input(f"Predicted: {predicted_label}. Correct? (y/n): ").lower() == 'y'

        if not is_correct:
            is_new_class = input("Is this a new class? (y/n): ").lower() == 'y'
            if is_new_class:
                new_class = input("Enter class name: ")
                if new_class not in le.classes_:
                    le.classes_ = np.append(le.classes_, new_class)
                    print(f"New class '{new_class}' added.")
                else:
                    print("Class already exists.")
            else:
                correct_label = input("Enter correct label: ")
                # Validate label exists
                if correct_label not in le.classes_:
                    print("Error: Label does not exist.")
                    return
            # Store data for retraining
            self.data.append(image_features)
            self.labels.append(correct_label if not is_new_class else new_class)
            print("Feedback stored. Retrain when ready.")
        else:
            print("Feedback recorded. Thank you!")
        
    def retrain_model(self):
        if not self.data:
            print("No new data to retrain.")
            return
        # Encode labels
        encoded_labels = le.transform(self.labels)
        # Transform all data
        updated_features = self.pca.transform(self.data)
        # Retrain KMeans
        self.model = KMeans(
            n_clusters=len(le.classes_), 
            init='k-means++', 
            n_init=100
        )
        self.model.fit(updated_features)
        # Reset stored data
        self.data, self.labels = [], []
        print("Model retrained with new data.")

In [None]:
vgg16_path = Path('..','models','VGG16.h5')
if not vgg16_path.is_file():
    vgg16 = keras.applications.VGG16(include_top=True,  # include fully connected layers
                                     weights='imagenet') # use pre-trained model
    vgg16.save(vgg16_path) # save model so we don't have to download it everytime
    
else:   
    vgg16 = keras.models.load_model(vgg16_path) # use saved model

In [None]:
def layer_extractor(model=vgg16, layer='fc1'):
    """
    returns a model that will extract the outputs of *layer* from *model*.
    
    Parameters
    -------------
    model: keras model
        full model from which intermediate layer will be extracted
    layer: string
        name of layer from which to extract outputs
    
    Returns
    -------------
    new_model: keras model
        feature extractor model which takes the same inputs as *model* and returns the outputs
        of the intermediate layer specified by *layer* by calling new_model.predict(inputs)
    """
    assert layer in [x.name for x in model.layers]  # make sure the layer exists

    new_model = keras.Model(inputs = vgg16.input, outputs=[vgg16.get_layer(layer).output])
    
    return new_model

In [None]:
fc1_extractor = layer_extractor()
fc1 = fc1_extractor.predict(images, verbose=True)

# save results
results = {'filename' : files,
           'features': fc1,
          'labels': labels,
           'layer_name': 'fc1'
          }

feature_dir = Path('..','data','features')
os.makedirs(feature_dir, exist_ok=True)
with open(feature_dir / 'VGG16_fc1_features_std.pickle', 'wb') as f:
    pickle.dump(results, f)

print(fc1.shape)

In [None]:
# Example usage in feedback loop:
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing import image

def extract_features(image_path):
    img = image.load_img(image_path, target_size=(224, 224))
    img_array = image.img_to_array(img)
    img_array = preprocess_input(img_array)
    features = fcl_extractor.predict(np.expand_dims(img_array, axis=0))
    return features.flatten()

In [None]:
# Initialize system
fb_system = FeedbackSystem()

# For a new image:
image_path = "path/to/new_image.bmp"
features = extract_features(image_path)

# Get prediction and feedback
fb_system.process_feedback(features)

# After collecting enough feedbacks:
fb_system.retrain_model()

In [None]:
# Example usage
fb_system = FeedbackSystem()
new_image_features = [...]  # Extract features for a new image
fb_system.process_feedback(new_image_features)
# After collecting enough feedback data:
# fb_system.retrain_model(new_data_list, new_label_list)