# core

> Fill in a module description here

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export

class Experiment:

    def __init__(self,
        detector: str,
        embeddingModel: str,
        scorer: BaseEstimator,
        calibrator: BaseEstimator,
        calibration_db: List[str],
        enfsi_years: List[int],
        filters: List[str],
        face_image_filters: List[str],
        metrics: str,
        n_calibration_pairs: int,
        embedding_model_as_scorer: bool,
        root_output_dir: str
        ):

        self.detector = detector
        self.embeddingModel = embeddingModel
        self.scorer = scorer
        self.calibrator = calibrator
        self.calibration_db = calibration_db
        self.enfsi_years = enfsi_years
        self.filters = filters
        self.face_image_filters = face_image_filters
        self.metrics = metrics
        self.n_calibration_pairs = n_calibration_pairs
        self.embedding_model_as_scorer = embedding_model_as_scorer
        self.root_output_dir = root_output_dir
        self.output_dir = get_output_dir()


    def get_output_dir(self):
        if isinstance(self.calibrator, IsotonicCalibrator):
            calibrator = 'Isotonic Calibrator'
        else:
            calibrator = str(self.calibrator)

        if self.embedding_model_as_scorer:
            folder = f'{self.detector}_{self.embeddingModel}(emb=scorer)_{calibrator.split("(")[0]}'
        else:
            folder = f'{self.detector}_{self.embeddingModel}(emb<>scorer)_{calibrator.split("(")[0]}'

        output_dir = os.path.join(self.root_output_dir, folder)
        return output_dir
    
    
    def perform(self) -> Dict[str, float]:
        """
        Function to run a single experiment with pipeline:
        - Fit model on train data
        - Fit calibrator on calibrator data
        - Evaluate test set
        """

        # Create folder
        self.create_output_dir()
        session = connect_db()

        # Get test pairs per category.
        test_pairs_per_category = self.get_test_pairs_per_category(session)
        # Get calibration pair per category.
        calibration_pairs_per_category = self.get_calibration_pairs_per_category(test_pairs_per_category.keys(),
                                                                                 session)
        '''
        # todo: move later.
        pairs_df = pd.DataFrame(columns=['Image 1', 'Image 2'])
        for category, pairs in calibration_pairs_per_category.items():
            print(category, len(pairs))
            for pair in pairs:
                pairs_df = pairs_df.append({'Image 1': pair.first.croppedImages.images.path, 'Image 2': pair.second.croppedImages.images.path}, ignore_index=True)

        pairs_df.to_excel(f'cal_pairs_{self.filters}_{self.face_image_filters}.xlsx')
        '''

        # Generate lr_system per category.
        lr_systems, test_pairs_per_category = self.generate_lr_systems(calibration_pairs_per_category,
                                                                       test_pairs_per_category, session)

        # Predict LR
        results = self.predict_lr(lr_systems, test_pairs_per_category, session)

        # todo: make necessary variables for graphs.
        return results

    def get_test_pairs_per_category(self, session):

        # t1 = time.process_time()
        # test_pairs = self.get_test_pairs(session)
        # valid_test_pairs = [pair for pair in test_pairs if pair.is_valid(self.detector)]
        # elapsed_time_1 = time.process_time() - t1
        # t2 = time.process_time()
        # todo: improve efficiency.
        valid_test_pairs = self.get_valid_test_pairs(session)
        # elapsed_time_2 = time.process_time() - t2
        # print(f'Old method is {elapsed_time_1}')
        # print(f'New method is {elapsed_time_2}')
        # test_categories = [
        #    pair.get_category(self.filters, self.face_image_filters, self.detector, self.embeddingModel)
        #    for pair in valid_test_pairs]

        test_categories = [
            row[0].get_category(self.filters, self.face_image_filters, self.detector, self.embeddingModel)
            for row in valid_test_pairs]

        test_pairs_per_category = defaultdict(list)
        # for face_pair, category in zip(valid_test_pairs, test_categories):
        #    test_pairs_per_category[category].append(face_pair)

        for row_enfsi_pair, category in zip(valid_test_pairs, test_categories):
            test_pairs_per_category[category].append(row_enfsi_pair)
        return test_pairs_per_category

    def get_test_pairs(self, session):

        test_pairs = (session.query(EnfsiPair)
                      .filter(EnfsiPair.second.has(EnfsiImage.year.in_(self.enfsi_years)))
                      .all()
                      )
        return test_pairs

    def get_valid_test_pairs(self, session) -> Tuple[EnfsiPair, FaceImage, FaceImage]:
        first_cropped_image = aliased(CroppedImage)
        second_cropped_image = aliased(CroppedImage)
        first_detector = aliased(Detector)
        second_detector = aliased(Detector)
        first_face_image = aliased(FaceImage)
        second_face_image = aliased(FaceImage)
        first_emb = aliased(EmbeddingModel)
        second_emb = aliased(EmbeddingModel)

        # Det&Emb
        det_id = session.query(Detector.detector_id).filter(Detector.name == self.detector).one()[0]
        emb_id = \
        session.query(EmbeddingModel.embeddingModel_id).filter(EmbeddingModel.name == self.embeddingModel).one()[0]

        query_face_img_id = session.query(CroppedImage.image_id, FaceImage) \
            .filter(CroppedImage.detector_id == det_id, CroppedImage.face_detected == True) \
            .filter(FaceImage.croppedImage_id == CroppedImage.croppedImage_id) \
            .filter(FaceImage.embeddingModel_id == emb_id).all()

        face_image_dict = defaultdict(FaceImage)
        for row in query_face_img_id:
            face_image_dict[row[0]] = row[1]

        query_pair_id = session.query(EnfsiPair).filter(EnfsiPair.second.has(EnfsiImage.year.in_(self.enfsi_years)))

        query_1 = query_pair_id \
            .join(first_cropped_image, EnfsiPair.first_id == first_cropped_image.image_id) \
            .filter(first_cropped_image.detector_id == det_id,
                    first_cropped_image.face_detected == True)

        query_2 = query_pair_id \
            .join(second_cropped_image, EnfsiPair.second_id == second_cropped_image.image_id) \
            .filter(second_cropped_image.detector_id == det_id,
                    second_cropped_image.face_detected == True)

        query = (query_1.intersect(query_2).all())

        valid_test_pairs = [(pair, face_image_dict[pair.first.image_id], face_image_dict[pair.second.image_id]) for pair
                            in query]

        # query_1 = session.query(EnfsiPair, first_face_image).filter(EnfsiPair.second.has(EnfsiImage.year.in_(self.enfsi_years)))
        # query_2 = session.query(EnfsiPair, second_face_image).filter(EnfsiPair.second.has(EnfsiImage.year.in_(self.enfsi_years)))
        #
        # #Cropped Images
        # query_1 = query_1 \
        #     .join(first_cropped_image, EnfsiPair.first_id == first_cropped_image.image_id) \
        #     .filter(first_cropped_image.detector_id == det_id,
        #             first_cropped_image.face_detected == True)
        #
        # query_2 = query_2 \
        #     .join(second_cropped_image, EnfsiPair.second_id == second_cropped_image.image_id) \
        #     .filter(second_cropped_image.detector_id == det_id,
        #             second_cropped_image.face_detected == True)
        #
        # # Face Images
        # query_1 = query_1 \
        #     .join(first_face_image, first_cropped_image.croppedImage_id == first_face_image.croppedImage_id) \
        #     .filter(first_face_image.embeddingModel_id == emb_id)
        #
        # query_2 = query_2 \
        #     .join(second_face_image, second_cropped_image.croppedImage_id == second_face_image.croppedImage_id) \
        #     .filter(second_face_image.embeddingModel_id == emb_id)
        #
        # query_1_result = (query_1.all())
        # query_2_result = (query_2.all())
        # query = query_1[0].intersect(query_2[0]).all()
        # # valid_test_pairs = (query.all())
        return valid_test_pairs

    def get_calibration_pairs_per_category(self, categories, session):
        cal_face_pairs = {}
        emb_facevacs = (self.embeddingModel == 'FaceVACs')

        for pair_category in categories:

            first_image_category = self.get_filtered_images(filter_values=pair_category[0], session=session)
            if pair_category[0] == pair_category[1]:
                if emb_facevacs:
                    all_calibration_pairs = get_calibration_facepairs_facevacs(
                        first_list_of_face_images=first_image_category,
                        second_list_of_face_images=first_image_category,
                        number_of_pairs=self.n_calibration_pairs,
                        session=session
                    )
                else:
                    all_calibration_pairs = make_cal_face_pairs(first_list_of_face_images=first_image_category,
                                                                number_of_pairs=self.n_calibration_pairs)

            else:
                second_image_category = self.get_filtered_images(filter_values=pair_category[1],
                                                                 session=session)
                if emb_facevacs:
                    all_calibration_pairs = get_calibration_facepairs_facevacs(
                        first_list_of_face_images=first_image_category,
                        second_list_of_face_images=second_image_category,
                        number_of_pairs=self.n_calibration_pairs,
                        session=session
                    )
                else:
                    all_calibration_pairs = make_cal_face_pairs(first_list_of_face_images=first_image_category,
                                                                second_list_of_face_images=second_image_category,
                                                                number_of_pairs=self.n_calibration_pairs)
            cal_face_pairs[pair_category] = all_calibration_pairs
        return cal_face_pairs

    def get_filtered_images(self, filter_values: tuple, session):
        im_filter_values = filter_values[:len(self.filters)]
        fi_filter_values = filter_values[len(self.filters):]
        assert len(fi_filter_values) == len(self.face_image_filters)
        query = session.query(FaceImage, Image.identity, Image.image_id)
        join_query = query \
            .join(CroppedImage, CroppedImage.croppedImage_id == FaceImage.croppedImage_id) \
            .join(Image, Image.image_id == CroppedImage.image_id) \
            .join(Detector) \
            .join(EmbeddingModel)
        filter_query = join_query \
            .filter(EmbeddingModel.name == self.embeddingModel,
                    Detector.name == self.detector) \
            .filter(Image.source.in_(self.calibration_db))
        for cal_filter, value in zip(self.face_image_filters, fi_filter_values):
            filter_query = filter_query.filter(FaceImage.__dict__[cal_filter] == value)
        for cal_filter, value in zip(self.filters, im_filter_values):
            filter_query = filter_query.filter(Image.__dict__[cal_filter] == value)
        return filter_query.all()

    def generate_lr_systems(self, calibration_pairs_per_category, test_pairs_per_category, session):

        lr_systems = {}
        for category, pairs in calibration_pairs_per_category.items():
            y_cal = np.asarray([int(pair.same_identity) for pair in pairs]).flatten()

            if self.embedding_model_as_scorer:
                X_cal = pairs

            else:

                if self.embeddingModel == 'FaceVACs':
                    cal_similarities = [pair.similarity for pair in pairs]
                    X_cal = np.reshape(np.asarray(cal_similarities), (-1, 1))
                else:
                    # todo: check if normalizing is necessary.
                    cal_distances = [pair.distance(self.metrics) for pair in pairs]
                    X_cal = np.reshape(np.asarray(cal_distances), (-1, 1))

            # Fit
            if 0 < np.sum(y_cal) < len(pairs):
                lr_systems[category] = CalibratedScorer(self.scorer, self.calibrator)
                if self.embedding_model_as_scorer:
                    lr_systems[category].fit_calibrator(X_cal, y_cal)
                else:
                    lr_systems[category].fit(X_cal, y_cal)

            else:
                del test_pairs_per_category[category]

        if len(lr_systems.keys()) == 0:
            return None

        return lr_systems, test_pairs_per_category

    def predict_lr(self, lr_systems, test_pairs_per_category, session):
        results = defaultdict(list)
        lrs_predicted = {}
        for category, row_test_pairs in test_pairs_per_category.items():
            # todo: converting pairs to facepairs. not the best place to do this.
            # pairs = [test_pair.make_face_image_pair(session, self.detector, self.embeddingModel) for test_pair in
            #         test_pairs]
            pairs = [FacePair(row_test_pair[1], row_test_pair[2], row_test_pair[0].same_identity) for row_test_pair in
                     row_test_pairs]
            test_pairs = [row_test_pair[0] for row_test_pair in row_test_pairs]

            test_norm_distances = [pair.norm_distance for pair in pairs]

            if self.embedding_model_as_scorer:
                X_test = pairs
            else:
                if self.embeddingModel == 'FaceVACs':
                    test_similarities = [pair.similarity for pair in pairs]
                    X_test = np.reshape(np.asarray(test_similarities), (-1, 1))
                else:
                    test_distances = [pair.distance(self.metrics) for pair in pairs]
                    X_test = np.reshape(np.asarray(test_distances), (-1, 1))

            lrs_predicted[category] = lr_systems[category].predict_lr(X_test)
            y_test = [int(pair.same_identity) for pair in pairs]
            results["test_pairs"] += test_pairs
            results["lrs_predicted"] += list(lrs_predicted[category])
            results["y_test"] += y_test
            results["test_norm_distances"] += test_norm_distances

        results['original_test_pairs'] = self.get_test_pairs(session)

        return results

    def get_test_face_pairs(self, test_pairs, session):
        pass

    def create_output_dir(self):
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)



    def __str__(self):
        """
        Converts the configuration of this experiment to a string that can be
        used to generate file names for example.
        """
        data_values = []
        for k, v in self.data_config.items():
            if k == 'datasets' and isinstance(v, tuple):
                data_values.append('|'.join(map(str, v)))
            else:
                data_values.append(str(v))

        params_str = '_'.join(map(str, self.params.values()))
        return '_'.join(map(str, [
            self.scorer,
            self.calibrator,
            params_str
        ])).replace(':', '-')  # Windows forbids ':'

        return None

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()