## Semi-Supervised Classification With Pairwise Constraints

In [None]:
import numpy as np
import pandas as pd

from itertools import groupby

from scipy.optimize import linear_sum_assignment # Hungarian algorithm

from sklearn.calibration import CalibratedClassifierCV
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.feature_selection import VarianceThreshold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, balanced_accuracy_score, f1_score
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

### Experiment configuration

In [None]:
folder = "../data/"

videos = [
    "Koi_5652_952_540",
    "Pigeons_29033_960_540_300f",
    "Pigeons_4927_960_540_600f",
    "Pigeons_8234_1280_720",
    "Pigs_49651_960_540_500f",
]

features = [
    "AE",
    "HOG",
    "LBP",
    "MN2",
    "RGB",
]

splits = [("h1", "h2"), ("h2", "h1")]

overlap_threshold = 0.5

classifiers = {
    "Linear Discriminant Analysis": LinearDiscriminantAnalysis,
    "Logistic Regression": LogisticRegression,
    "Calibrated CV": CalibratedClassifierCV,
    "Quadratic Discriminant Analysis": QuadraticDiscriminantAnalysis,
    "Extra Tree Ensemble": ExtraTreesClassifier,
}


  ## Functions

In [None]:
def overlap(x1, y1, w1, h1, x2, y2, w2, h2):
    """
    Calculates the overlap between two bounding boxes.
    
    :param x1: x coordinate of the first box
    :param y1: y coordinate of the first box
    :param w1: width of the first box
    :param h1: height of the first box
    
    :return: the overlap 
    """
    
    r1 = [x1, y1, x1 + w1, x1 + h1]
    a1 = w1 * h1
    r2 = [x2, y2, x2 + w2, x2 + h2]
    a2 = w2 * h2
    r = [max(x1, x2), max(y1, y2), min(x1 + w1, x2 + w2), min(y1 + h1, y2 + h2)]
    a = max(0, (r[2] - r[0])) * max(0, (r[3] - r[1]))
    return a / (a1 + a2 - a)

In [None]:
def combine_features(X, objects_in_track):
    """
    Averages the features of the instances in the same track.
    
    :param X: feature values of the instances.
    :param objects_in_track: dictionary with the tracks
    
    :return: a modified X with averaged features
    """
    
    X_combined = X.copy()
    for objects in objects_in_track.values():
        if len(objects) <= 1:
            continue
        means = X_combined.iloc[objects].mean(axis=0)
        X_combined.iloc[objects] = means 
    return X_combined

In [None]:
def combine_probabilities(y_probs, objects_in_track, classes):
    """"
    Averages the classifier probabilities of the instances in the same track.
    
    :param y_probs: classifier probabilities for the instances 
    :param objects_in_track: dictionary with the tracks
    :param classes: classes used to train the classifier
    
    :return: a tuple with the modified y_probs ans the predicted classes
    """
    
    y_probs_combined = y_probs.copy()
    
    for objects in objects_in_track.values():
        if len(objects) <= 1:
            continue         
        means = y_probs_combined[objects, :].mean(axis=0)
        y_probs_combined[objects, :] = means  

    preds = classes.take(list(np.argmax(y_probs_combined, axis=1)))
    return y_probs_combined, preds

In [None]:
def restricted_set_hungarian(probs, classes):
    """
    Restricted Set Classification for a set of objects that have to be of different classes
    
    :param probs: the probabilities given by the classifier
    :param classes: the classes seen by the classifier
    
    :return: a tuple with 1) the Hungarian method was used (0 or 1), and 2) the predicted classes
    """
    
    rows, cols = probs.shape
    preds = list(np.argmax(probs, axis=1))

    if rows > cols or len(preds) == len(set(preds)):
        # return 0 if rows > cols else 1, classes.take(preds)
        return 0, classes.take(preds)
    costs = np.log(probs)    
        
    try:
        row_ind, col_ind = linear_sum_assignment(costs, maximize=True)
        col_ind = list(col_ind)
    except: # some of the values was -Inf
        probs += np.nextafter(0, 1) # small double value
        costs = np.log(probs)        
        row_ind, col_ind = linear_sum_assignment(costs, maximize=True)
        col_ind = list(col_ind)
        
    return 1, classes.take(col_ind)

In [None]:
def restricted_set_classification(y_probs, instances_by_frame, classes):
    """
    Restricted Set Classification for the instances in several frames
    
    :param y_probs: the probabilities given by the classifier for the instances
    :param instances_by_frame: which instances are in each frame
    :param classes: the classes seen by the classifier
    
    :return: the predicted labels
    """

    restricted_pred = []
    num_conflicts = 0
    for fr, group in instances_by_frame.items():
        if len(group) == 0:
            continue
        first, last = group[0], group[-1]
        group_probs = y_probs[first:last + 1]
        conflict, group_pred = restricted_set_hungarian(group_probs, classes)
        restricted_pred.extend(group_pred)
        num_conflicts += conflict

    assert len(restricted_pred) == len(y_probs)
        
    return restricted_pred

In [None]:
def frames_to_tracks(frames):
    """
    Restricted Set Classification for the instances in several frames
    
    :param frames: the frames with their boundig boxes
    
    :return: a tuple with 1) which instances indexes are in each frame, and 2) which objects form each track
    """

    prev_indexes = None
    prev_frame = None
    instances_by_frame = {}
    track_of_object = {x:x for x in frames.index}
    
    for frame, group in groupby(enumerate(frames['frame']), lambda x: x[1]):
        group = list(group)
        indexes = [x[0] for x in group]
        
        if prev_frame is not None:
            for fr in range(prev_frame + 1, frame): # some frames can be without objects
                instances_by_frame[fr] = []
        instances_by_frame[frame] = indexes
        
        if prev_indexes and len(indexes) and len(prev_indexes):

            overlaps = np.zeros((len(prev_indexes), len(indexes)))
            first = indexes[0]
            first_p = prev_indexes[0]

            for i1 in prev_indexes:
                bb1 = frames.iloc[[i1]]
                x1, y1, w1, h1 = bb1[['x', 'y', 'width', 'height']].values[0]
                # print(x1, y1, w1, h1, l1)
                for i2 in indexes:
                    bb2 = frames.iloc[[i2]]
                    x2, y2, w2, h2 = bb2[['x', 'y', 'width', 'height']].values[0]
                    o = overlap(x1, y1, w1, h1, x2, y2, w2, h2)   
                    overlaps[i1 - first_p, i2 - first] = o

            row_ind, col_ind = linear_sum_assignment(overlaps, maximize=True)
            for r, c in zip(row_ind, col_ind):
                o =  overlaps[r, c]
                if o > overlap_threshold:
                    # print(r, c, o, first_p + r, first + c)
                    t = track_of_object.get(first_p + r, first_p + r)
                    track_of_object[first + c] = t
        
        prev_indexes = indexes  
    
    objects_in_track = {}    
    for o, t in track_of_object.items():
        objects_in_track[t] = [t] if t not in objects_in_track.keys() else objects_in_track[t] + [o]    

    return instances_by_frame, objects_in_track

## The experiment

In [None]:
results = pd.DataFrame(columns=["dataset", "features", "classifier", "method", "fold", 
                                "accuracy", "balanced_accuracy"])  

for classifier_name, classifier_method in classifiers.items():
    for video in videos:
        for train, test in splits:
                   
            frames = pd.read_csv(f"{folder}{test}_{video}_frames.csv")
            frames.drop(["label", "image"], axis=1, inplace=True)
     
            instances_by_frame, objects_in_track = frames_to_tracks(frames)
       
            for feature in features:
                print(classifier_name, video, train, test, feature)

                train_DF = pd.read_csv(f"{folder}{train}_{video}_{feature}.csv")
                test_DF = pd.read_csv(f"{folder}{test}_{video}_{feature}.csv")

                X_train = train_DF.iloc[: , :-1]
                y_train = train_DF.iloc[: , -1]

                X_test = test_DF.iloc[: , :-1]
                y_test = test_DF.iloc[: , -1]

                pipe = make_pipeline(VarianceThreshold(), StandardScaler(), classifier_method())
                pipe.fit(X_train, y_train)

                y_pred = pipe.predict(X_test)
                y_probs = pipe.predict_proba(X_test)
                classes = pipe.classes_

                results.loc[len(results)] = [
                    video, feature, classifier_name, "Independent", train,
                    accuracy_score(y_test, y_pred),
                    balanced_accuracy_score(y_test, y_pred)]               

                restricted_pred = restricted_set_classification(y_probs, instances_by_frame, classes)
                results.loc[len(results)] = [
                    video, feature, classifier_name, "Hungarian", train,
                    accuracy_score(y_test, restricted_pred),
                    balanced_accuracy_score(y_test, restricted_pred)]


                y_probs_combined, y_pred_combined = combine_probabilities(y_probs, objects_in_track, classes)

                results.loc[len(results)] = [
                    video, feature, classifier_name, "B*", train,
                    accuracy_score(y_test, y_pred_combined),
                    balanced_accuracy_score(y_test, y_pred_combined)]


                restricted_pred = restricted_set_classification(y_probs_combined, instances_by_frame, classes)
                results.loc[len(results)] = [
                    video, feature, classifier_name, "B", train,
                    accuracy_score(y_test, restricted_pred),
                    balanced_accuracy_score(y_test, restricted_pred)]


                X_combined_test = combine_features(X_test, objects_in_track)
                y_combined_pred = pipe.predict(X_combined_test)
                y_combined_probs = pipe.predict_proba(X_combined_test)

                results.loc[len(results)] = [
                    video, feature, classifier_name, "A*", train,
                    accuracy_score(y_test, y_combined_pred),
                    balanced_accuracy_score(y_test, y_combined_pred)]


                restricted_pred = restricted_set_classification(y_combined_probs, instances_by_frame, classes)
                results.loc[len(results)] = [
                    video, feature, classifier_name, "A", train,
                    accuracy_score(y_test, restricted_pred),
                    balanced_accuracy_score(y_test, restricted_pred)]

                results.to_csv("constrained_results.csv", index=None)
          
    display(results)              

 ## Formatting the results in a table

In [None]:
df = results.copy()

df['dataset'] += "_" + df['features']

df.drop(['features', 'balanced_accuracy'], axis=1, inplace=True)

df = df.groupby(['dataset', 'classifier', 'method'], 
                as_index=False)['accuracy'].mean()

display(df)

df = df.pivot(['classifier', 'method'], 'dataset', 'accuracy')

# df.sort_index(ascending=False, inplace=True)


display(df)

df.to_csv("constrained_CV.csv")
