In [3]:
import pandas as pd
import cv2 as cv
import matplotlib.pyplot as plt
import sys

from PIL import Image
import numpy as np
import os
from tqdm import tqdm # for loading progress

import seaborn as sns #  for plotting

from sklearn.svm import SVC,LinearSVC, LinearSVR
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA

sys.path.append('../../elpv-dataset/utils')
from elpv_reader import load_dataset

from keras.applications.vgg19 import VGG19, preprocess_input
from keras.preprocessing import image
from keras.models import Model

"""image_preprocessor = ImagePreprocessor()
keypoint_detector = KeypointDetector() # SIFT_create()
feature_descriptor = FeatureDescriptor() # sift : .compute
encoder = VLAD_Encoder()
classifier = Classifier()
model = Model(keypoint_detector, feature_descriptor, encoder, classifier)"""


class DataSplitter:
    def __init__(self, encoder):
        self.encoder = encoder

    def split(self, data):
        data['binary_label'] = (data['label'] > 0).astype(int)
        data['sample_weights'] = data['label'].apply(lambda x: 1 if (x == 0 or x == 1) else x)
        y_binary = np.where(y > 0, 1, 0)
        sample_weights = np.where((y == 0) | (y == 1), 1, y) 
        X_train, X_test, y_train, y_test, sw_train, sw_test = train_test_split(
            data[['img_index', 'descriptors']],
            data['label'],
            data['sample_weights'],
            test_size=0.25,
            random_state=42,
            stratify=y_binary
        )
        X_train = self.encoder.encode(X_train['descriptors'])
        X_test = self.encoder.encode(X_test['descriptors'])
        return X_train, X_test, y_train, y_test, sw_train, sw_test


class VLAD_Encoder:
    def __init__(self,subset_fraction=0.25,  p=0.5, k=10, num_random_subsets=5):
        
        self.codebook = {}
        self.num_random_subsets = num_random_subsets
        self.p = p
        self.m = num_random_subsets
        self.subset_fraction = subset_fraction # 25% of training samples to be randomly sampled from
        self.k = k # size codebook i.e. number of words
        self.pca = PCA(whiten=True, n_components=128)

    def _create_codebook(self, all_descriptors):
        """Requires flattened array."""
        num_descriptors = all_descriptors.shape[0]
        subset_size = round(num_descriptors * self.subset_fraction)
        
        for i in range(self.num_random_subsets):
            subset_indices = np.random.choice(num_descriptors, subset_size, replace=False)
            subset_descriptors = all_descriptors[subset_indices]
            kmeans = MiniBatchKMeans(n_clusters=self.k, random_state=i, batch_size=256*4) # 256*#cores
            kmeans.fit(subset_descriptors)# replace with only test set samples
            self.codebook[i] = kmeans

    def _encode(self, kmeans_, descriptors):
        vlad_vector = []#np.empty((1, m*k*d))
        
        for idx, kmeans_ in self.codebook.items():
            cluster_assignments = kmeans_.predict(descriptors)
            
            vlad_vector_ = np.zeros((self.k, descriptors.shape[1]),) #(K, d)
            
            for idx, cluster_idx in enumerate(cluster_assignments):
                vlad_vector_[cluster_idx] += (descriptors[idx] - kmeans_.cluster_centers_[cluster_idx])
        
            vlad_vector_ = np.sign(vlad_vector_) * np.abs(vlad_vector_) ** self.p
            vlad_vector_ = normalize(vlad_vector_.reshape(1, -1), axis=1, norm='l2') # (1, Kd)
            vlad_vector.append(vlad_vector_)
        return np.hstack(vlad_vector)
    
    def encode(self, all_descriptors):
        """Takes in a DataFram column and return"""
        fit_transform = False
        
        if not self.codebook:
            # all_descriptors is grouped by image. Need to flatten it.
            self._create_codebook(np.vstack(all_descriptors))
            fit_transform = True

        all_vlad_vectors = np.empty()
        for idx, img_descriptors in enumerate(all_descriptors):
            all_vlad_vectors[idx, :] = self._encode(img_descriptors)

        if fit_transform:
            all_vlad_vectors = self.pca.fit_transform(all_vlad_vectors)
        else:
            all_vlad_vectors = self.pca.transform(all_vlad_vectors)
        return all_vlad_vectors
        






    

class FeatureDescriptor:
    def __init__(self, descriptor_instance):
        self.descriptor_instance = descriptor_instance

    def compute(self, images, keypoints):
        """
            Returns:
                all descriptors for each image    
        """
        assert(len(images) == len(keypoints))
        self.num_descriptors = 0
        all_descriptors = [self._compute(img, kp) for img, kp in zip(images, keypoints)]
        return all_descriptors

    def _compute(self, image, keypoints):
        descriptors = self.descriptor_instance.compute(image, keypoints)[1]
        #self.num_descriptors += 1 if descriptors 

class DataLoader:
    def __init__(self, load_dataset, filepath):
        self.load_dataset = load_dataset
        self.filepath = filepath

    def load(self) -> pd.DataFrame:
        """Load the data and reduce it if required.

            Returns:
                pd.DataFram, columns = [img_id, type, proba]
        """
        images, probas, types = self.load_dataset(self.filepath)
        data, probas, types = self.load_dataset(*self.args,**self.kwargs)
        img_ids = np.arange(images.shape[0])
        df = pd.DataFrame({'img_id': img_ids, 'type': types, 'proba': probas})
        reduced_df = reduce_dataset(df, 1.0) # reduce to 50% original
        return images, reduced_df

class ImagePreprocessor:
    ...

class KeypointDetector:
    ...

class DenseSampler:
    def __init__(self, grid_size=60):
        self.grid_size = grid_size

    def detect(self, images: np.array) -> list:
        """Returns list of descriptors in order of images"""
        descriptors = [self._detect(images[i]) for i in tqdm(images, desc="Densley sampling image keypoints")]
        return descriptors
    
    def _detect(self, image):
        assert image is not None
        img_dim = image.shape[0]
        print(img_dim)
        n_cells_x = img_dim // self.grid_size
        n_cells_y = img_dim // self.grid_size

        # Calculate the centers of each grid cell as the keypoint
        centers = tuple(cv.KeyPoint(x * self.grid_size + self.grid_size / 2, y * self.grid_size + self.grid_size / 2, self.grid_size)
                       for y in range(n_cells_y) for x in range(n_cells_x))

        return centers # cv.KeyPoint


"""images, probas, types = load_dataset('./KOC/koc/data/labels.csv')
keypoint_detector = DenseSampler(grid_size=60)
keypoints = keypoint_detector.detect(images)

sift = cv.SIFT_create()
feature_descriptor = FeatureDescriptor(sift)
descriptors = feature_descriptor.compute(images, keypoints) # List[np.ndarray(x, 128)]

img_indices = np.arange(images.shape[0])
data = pd.DataFrame({
    'img_index': img_indices,
    'descriptors': descriptors,
    'label': probas,
    'type': types
})

# Remove undetected keypoints
data = data[data['descriptors'].apply(lambda d: d is not None and len(d) > 0)]

encoder = VLAD_Encoder(p=0.5, k=100, num_random_subsets=5)
#encoder.encode(feature_descriptor.num_descriptors)

data_splitter = DataSplitter(encoder)
X_train, X_test, y_train, y_test, sw_train, sw_test = data_splitter.split(data)"""


class Classifier:
    def __init__(self, clf, optimiser):
        self.clf = clf

    def fit(self, X):
        self.clf.fit(X)

    def predict(self):
        ...

#clfs = {'svm': LinearSVC, 'rf': RandomForestClassifier}




class Model:
    def __init__(self, keypoint_detector, feature_descriptor, encoder, clf):
        self.keypoint_detector = keypoint_detector
        self.feature_descriptor = feature_descriptor
        self.encoder = encoder
        self.clf = clf

    def run(self):
        keypoints = self.keypoint_detector.detect()
        descriptors = self.feature_descriptor.compute()



In [4]:
images, probs, types = load_dataset()
keypoint_detector = DenseSampler(grid_size=60)


OSError: ../../elpv-dataset/data/labels.csv not found.