In [1]:
import numpy as np

In [None]:
def record_positive_anchor_images(file_path, image_name, image_type, n_samples=300, create_new=False, to_log=False):
    """OpenCV real time positive and anchor images reciever.
    
    Parameters
    ----------
    file_path: str
        Path to the file positive and anchor images will be stored.
    image_name: str
        Nickname of the person, which image will be captured.
    image_type: 'positive' | 'anchor'
        Image type whether positive or anchor.
    n_samples: int
        The amount of images should be captured.
    create_new: bool
        Whether to truncate existing folders with positive and anchor images or not.
    to_log: bool
        Whether or not to show up the counter of anchor and positive images made from the beginning of the session.
    """
    file_io      = None
    counter      = 0
    temp_storage = 'temp_storage'
    
    file_io = h5py.File(file_path, 'a')
    if create_new:
        try:
            if image_type == 'anchor':
                del file_io['anchor']
            else:
                del file_io['positive']
        except:
            pass
    
    os.mkdir(temp_storage)
    if image_type == 'anchor':
        file_io.create_group('anchor')
    else:
        file_io.create_group('positive')
    
    cam = cv2.VideoCapture(cv2.CAP_V4L2)
    uid = uuid.uuid1()
    try:
        while cam.isOpened():
            _, frame = cam.read()
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = frame[120:370, 200:450, :]

            cv2.imshow('Verification', frame)
            
            if image_type == 'anchor':
                path = os.path.join(temp_storage, f'{(image_name + "-" + str(uid))}.jpg')
                cv2.imwrite(path, frame)

                uid = uuid.uuid1()
                counter += 1
                if to_log:
                    print(f'#{counter} anchor added!')
                if counter > n_samples-1:
                    break
            else:
                path = os.path.join(temp_storage, f'{(image_name + "-" + str(uid))}.jpg')
                cv2.imwrite(path, frame)

                uid = uuid.uuid1()
                counter += 1
                if to_log:
                    print(f'#{counter} positive added!')
                if counter > n_samples-1:
                    break

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    except Exception as e:
        print(e)
    finally:
        cam.release()
        cv2.destroyAllWindows()
        
        for image in os.listdir(temp_storage):
            byte_img = tf.io.read_file(os.path.join(temp_storage, image))
            path = os.path.join(image_type, image[:-4])
            img = tf.io.decode_jpeg(byte_img).numpy()
            file_io.create_dataset(path, (250, 250, 3), chunks=True)
            file_io[path][:, :, :] = img
        file_io.close()
        shutil.rmtree(temp_storage)

In [1]:
class DataLoader():
    """Data workflow for downloading, moving and getting data.
    
    Attributes
    ----------
    log_prefix: str
        Prefix for log functions and decorators.
    url: str
        The url source from which the file will be downloaded.
    file_name: str
        Name of the file which will be downloaded from the url.
    to_log: bool
        Whether to turn on logging or not.
    """
    log_prefix = 'DataLoader::'
    def __init__(self, url, to_log=False):
        self.url       = url
        self.file_name = self.url.split('/')[-1]
        self.to_log    = to_log

        self.__downloaded_file = None
    
    def __download_tar_file(self):
        """Downloads a tar file from the specified url.
        """
        response = requests.get(self.url, stream=True)
        self.__downloaded_file = tarfile.open(fileobj=response.raw, mode='r|gz')
        
    def __download_file(self):
        """Downloads a file from the specified url.
        """
        assert requests.get(self.url).ok, "Established url is not reachable!"
        
        extension = self.file_name.split('.')[-1]

        if extension in ['tgz', 'tbz', 'txz']:
            time_logger(log_name=f'{self.log_prefix}download_tar_file', log_on=self.to_log)\
            (self.__download_tar_file)()
        else:
            raise Exception('Download is not performed as this format is not maintained!')

    def __extract_tgz_file(self):
        """Extracts downloaded tgz file.
        """
        assert self.file_name.endswith('.tgz'), "Downloaded dataset is not tgz format!"

        self.__downloaded_file.extractall(path='.')
        self.__downloaded_file = None
  
    def __extract_file(self):
        """Extracts downloaded file.
        """
        assert self.__downloaded_file is not None, "Dataset is not downloaded yet!"
        
        extension = self.file_name.split('.')[-1]

        if extension == 'tgz':
            time_logger(log_name=f'{self.log_prefix}extract_tgz_file', log_on=self.to_log)\
            (self.__extract_tgz_file)()
        else:
            raise Exception('Extraction is not performed as this format is not maintained!')
    
    def __to_hdf5(self, h5file_path, data_path, group, quantity_of_images, data_path_type='standard'):
        """Moves data to h5 file.
        
        Parameters
        ----------
        h5file_path: str
            File path with the .h5 extension, in which images will be moved.
        data_path: str
            Path to data folder with images or folders (classnames) with images.
        group: str
            Group (folder) name in the .h5, in which images (datasets) will be stored.
        quantity_of_images: int
            The amount of images that will be download to the .h5 file.
        data_path_type: 'standard' | 'all_in_one'
            'standard' gains the 'data_path', which consists of folder (classnames) with images.
            'all_in_one' gain the 'data_path', which consists of images only.
        """
        assert h5file_path.endswith('h5'), "Don't forget to add .h5 to file name!"
        
        with h5py.File(h5file_path, 'w', driver='core', block_size=1024) as f:
            f.create_group(group)
            
            if data_path_type == 'standard':
                root = os.listdir(data_path)
                np.random.shuffle(root)
                for directory in root[:quantity_of_images]:
                    image = os.listdir(os.path.join(data_path, directory))[0]
                    image_path = os.path.join(group, image.split('.')[0])
                    f.create_dataset(image_path, (250, 250, 3), dtype='f', chunks=True)
                    byte_img = tf.io.read_file(os.path.join(data_path, directory, image))
                    img = tf.io.decode_jpeg(byte_img).numpy()
                    f[image_path][:, :, :] = img
            elif data_path_type == 'all_in_one':
                root = os.listdir(data_path)
                np.random.shuffle(root)
                for image in root[:quantity_of_images]:
                    image_path = os.path.join(group, image.split('.')[0])
                    f.create_dataset(image_path, (250, 250, 3), dtype='f', chunks=True)
                    byte_img = tf.io.read_file(os.path.join(data_path, directory, image))
                    img = tf.io.decode_jpeg(byte_img).numpy()
                    f[image_path][:, :, :] = img

    def __move_data(self, file_path, data_path, extension, **kwargs):
        """Moves data to another file/folder.
        
        Parameters
        ----------
        file_name: str
            File name, in which images will be moved.
        data_path: str
            Path to data folder with images or folders (classnames) with images.
        extension: str
            Extension of the file in which images will be stored, like 'hdf5'.
        """
        if extension == 'hdf5':
            assert kwargs.get('group') and \
                   kwargs.get('quantity_of_images'),\
            "Group or/and quantity_of_images arguments haven't been set for hdf5. Set the arguments!"
            
            time_logger(log_name=f'{self.log_prefix}to_hdf5', log_on=self.to_log)\
            (self.__to_hdf5)(file_path, data_path, **kwargs)
        else:
            raise Exception('Move is not performed as this format is not maintained!')
    
    def __from_hdf5(self, h5file_path, mode, folder=None, img=None):
        """Gets data from the .h5 file.
        
        Parameters
        ----------
        h5file_path: str
            Path to the .h5 file to get data from.
        mode: 'all' | 'dir' | 'img'
            Specifies the type of get.
            'all' get all datasets (images) from every group (folder).
            'dir' get all datasets (images) from specified group (folder).
            'img' get specified dataset (image) from the specified group (folder).
        folder: str | None
            If mode equals 'dir' the 'folder' argument is required. Specifies the group (folder),
            from which all images will be obtained.
        img: str | None
            If mode equals 'img' the 'folder' and 'img' arguments are required. Specifies the dataset (image)
            and group (folder) to get.
        
        Returns
        -------
        data: list
            Returns gathered data form the .h5 file.
        """
        assert h5file_path.endswith('h5'), "Don't forget to add .h5 to file name!"
        if mode == 'dir' and folder is None:
            raise Exception('In \'dir\' mode you must specify folder argument!')
        if mode == 'img' and (folder is None or img is None):
            raise Exception('In \'img\' mode you must specify folder and img arguments!')
        
        data = []
        
        with h5py.File(h5file_path, 'r') as f:
            if mode == 'all':
                for folder in f.keys():
                    images = [image_name for image_name in f[folder].keys()]
                    for image in images:
                        data.append(np.array(f[folder + '/' + image]))
            elif mode == 'dir':
                images = [image_name for image_name in f[folder].keys()]
                for image in images:
                    data.append(np.array(f[folder + '/' + image]))
            elif mode == 'img':
                data.append(np.array(f[folder + '/' + img]))
            else:
                raise Exception('No such mode in hdf5! Choose followings: \'all\', \'dir\', \'img\'')
                
        return data
    
    def __get_data(self, file_path, extension, mode, **kwargs):
        """Gets data from a file.
        
        Parameters
        ----------
        file_path: str
            Path to the file to get data from.
        extensions: str
            Extension of the file in which images will be stored, like 'hdf5'.
        mode: 'all' | 'dir' | 'img'
            Specifies the type of get.
            'all' get all images from every folder.
            'dir' get all images from specified folder.
            'img' get specified image from the specified folder.
        
        Returns
        -------
        data: list
            Returns gathered data from a file.
        """
        assert os.path.exists(file_path), f"File {file_path} doesn't exist!"
        
        data = []
        
        if extension == 'hdf5':
            data = self.__from_hdf5(file_path, mode, **kwargs)
        else:
            raise Exception('Get is not performed as this format is not maintained!')
        
        return data
    
    def get_all(self, file_path, extension):
        """Gets all data from a file.
        
        Parameters
        ----------
        file_path: str
            Path to the file to get data from.
        extensions: str
            Extension of the file in which images will be stored, like 'hdf5'.
        
        Returns
        -------
        data: list
            Returns all gathered data from a file.
        """
        data = self.__get_data(file_path, extension, mode='all')
        return data
    
    def get_by_dir(self, file_path, extension, **kwargs):
        """Gets data from a file by directory.
        
        Parameters
        ----------
        file_path: str
            Path to the file to get data from.
        extensions: str
            Extension of the file in which images will be stored, like 'hdf5'.
        
        Returns
        -------
        data: list
            Returns gathered data from a file by directory.
        """
        data = self.__get_data(file_path, extension, mode='dir', **kwargs)
        return data
    
    def get_by_image(self, file_path, extension, **kwargs):
        """Gets data from a file by image and directory.
        
        Parameters
        ----------
        file_path: str
            Path to the file to get data from.
        extensions: str
            Extension of the file in which images will be stored, like 'hdf5'.
        
        Returns
        -------
        data: list
            Returns gathered data from a file by image and directory.
        """
        data = self.__get_data(file_path, extension, mode='img', **kwargs)
        return data

    def load_file(self, to_download=True, to_extract=True, to_move=True, **kwargs):
        """Data loading workflow.
        
        Parameters
        ----------
        to_download: bool
            Whether to download file or not.
        to_extract: bool
            Whether to extract downloaded file or not.
        to_move: ball
            Whether to move downloaded file or not.
        """
        if to_download:
            self.__download_file()
        if to_extract:
            self.__extract_file()
        if to_move:
            assert kwargs.get('file_path') and \
                   kwargs.get('data_path') and \
                   kwargs.get('extension'),\
            "file_name or data_path or extensions argument(-s) is not set. Set following arguments!"
            
            self.__move_data(**kwargs)

In [None]:
class DataPreprocessor():
    """Data workflow for downloading, moving and getting data.
    
    Attributes
    ----------
    log_prefix: str
        Prefix for log functions and decorators.
    positive_data: list
        Positive dataset to preprocess.
    negative_data: list
        Negative dataset to preprocess.
    anchor_data: list
        Anchor dataset to preprocess.
    to_log: bool
        Whether to turn on logging or not.
    """
    log_prefix = 'DataPreprocessor::'
    def __init__(self, positive_data, negative_data, anchor_data, to_log=False):
        assert len(positive_data) == len(negative_data) == len(anchor_data), \
        "Shapes of all positive, negative and anchor datasets must coincide!"
        
        self.positive_data = positive_data
        self.negative_data = negative_data
        self.anchor_data   = anchor_data
        self.len_data      = len(positive_data)
        self.to_log        = to_log
        
        self.__anchor_positive_data = None
        self.__anchor_negative_data = None
        self.__preprocessed_data    = None
        self.__train_data           = None
        self.__test_data            = None
    
    def __zip_data(self):
        """Zips two data and convert it to tf.data.Dataset.
        """
        
        self.__anchor_positive_data = tf.data.Dataset.zip((
            tf.data.Dataset.from_tensors(self.anchor_data),
            tf.data.Dataset.from_tensors(self.positive_data),
            tf.data.Dataset.from_tensor_slices(tf.ones(self.len_data)),
        ))
        
        self.__anchor_negative_data = tf.data.Dataset.zip((
            tf.data.Dataset.from_tensors(self.anchor_data),
            tf.data.Dataset.from_tensors(self.negative_data),
            tf.data.Dataset.from_tensor_slices(tf.zeros(self.len_data)),
        ))
        print(self.__anchor_positive_data)
        print(self.__anchor_negative_data)
    
    def __merge_data(self, shuffle=True):
        """Merges two data into one dataset.
        
        Caches tensors in RAM.
        Shuffles data in the dataset.
        
        Parameters
        ----------
        shuffle: bool
            Whether or not shuffle merged data.
        """
        assert self.__anchor_positive_data is not None and \
               self.__anchor_negative_data is not None, \
        "Initialized data should be zipped before merge!"
        
        self.__preprocessed_data = self.__anchor_positive_data.concatenate(self.__anchor_negative_data)
        print(self.__preprocessed_data)
        self.__preprocessed_data.cache()
        
        if shuffle:
            self.__preprocessed_data.shuffle(buffer_size=self.len_data * 2)
    
    def __augmentate_data(
        self,
        brightness=0,
        contrast=None,
        flip_left_right=False,
        jpeg_quality=None,
        saturation=None,
    ):
        """Perform data augmentation.
        
        Parameters
        ----------
        brightness: float
            Brightness.
        contrast: list[float, float] | tuple(float, float)
            Lower and upper bounds for the random contrast factor. 
        flip_left_right: bool
            Whether to flip image verticaly or not.
        jpeg_quality: list[float, float] | tuple(float, float)
            Minimum and maximum jpeg encodings quality to use. 
        saturation: list[float, float] | tuple(float, float)
            Lower and upper bounds for the random saturation factor. 
        """
        positive_buff = []
        negative_buff = []
        anchor_buff   = []
        seed          = (42, 24)
        
        for positive_image, negative_image, anchor_image in zip(self.positive_data, self.negative_data, self.anchor_data):
            positive_image = tf.image.stateless_random_brightness(positive_image, max_delta=brightness, seed=seed)
            positive_image = tf.image.stateless_random_contrast(positive_image, lower=contrast[0], upper=contrast[1], seed=seed)
            positive_image = tf.image.stateless_random_flip_left_right(positive_image, seed=seed)
            positive_image = tf.image.stateless_random_jpeg_quality(positive_image, min_jpeg_quality=jpeg_quality[0], max_jpeg_quality=jpeg_quality[1], seed=seed)
            positive_image = tf.image.stateless_random_saturation(positive_image, lower=saturation[0], upper=saturation[1], seed=seed)
            positive_image = tf.image.resize(positive_image, IMAGE_SIZE)
            positive_image = positive_image / 255.0
            
            negative_image = tf.image.stateless_random_brightness(negative_image, max_delta=brightness, seed=seed)
            negative_image = tf.image.stateless_random_contrast(negative_image, lower=contrast[0], upper=contrast[1], seed=seed)
            negative_image = tf.image.stateless_random_flip_left_right(negative_image, seed=seed)
            negative_image = tf.image.stateless_random_jpeg_quality(negative_image, min_jpeg_quality=jpeg_quality[0], max_jpeg_quality=jpeg_quality[1], seed=seed)
            negative_image = tf.image.stateless_random_saturation(negative_image, lower=saturation[0], upper=saturation[1], seed=seed)
            negative_image = tf.image.resize(negative_image, IMAGE_SIZE)
            negative_image = negative_image / 255.0
            
            anchor_image = tf.image.stateless_random_brightness(anchor_image, max_delta=brightness, seed=seed)
            anchor_image = tf.image.stateless_random_contrast(anchor_image, lower=contrast[0], upper=contrast[1], seed=seed)
            anchor_image = tf.image.stateless_random_flip_left_right(anchor_image, seed=seed)
            anchor_image = tf.image.stateless_random_jpeg_quality(anchor_image, min_jpeg_quality=jpeg_quality[0], max_jpeg_quality=jpeg_quality[1], seed=seed)
            anchor_image = tf.image.stateless_random_saturation(anchor_image, lower=saturation[0], upper=saturation[1], seed=seed)
            anchor_image = tf.image.resize(anchor_image, IMAGE_SIZE)
            anchor_image = anchor_image / 255.0
            
            positive_buff.append(positive_image)
            negative_buff.append(negative_image)
            anchor_buff.append(anchor_image)
        
        self.positive_data = positive_buff.copy()
        self.negative_data = negative_buff.copy()
        self.anchor_data   = anchor_buff.copy()
    
    def __train_test_split(self, test_size=0.2):
        """Performs data split on train and test datasets.
        
        Parameters
        ----------
        test_size: float
            The fraction of test size in respect of inputted data.
        """
        assert self.__preprocessed_data is not None, "The dataset must be merged before the 'train_test_split'!"
        
        self.__train_data = self.__preprocessed_data.take(round(self.len_data * 2 * (1 - test_size)))
        self.__train_data = self.__train_data.batch(16)
        self.__train_data = self.__train_data.prefetch(8)
        
        self.__test_data = self.__preprocessed_data.skip(round(self.len_data * 2 * (1 - test_size)))
        self.__test_data = self.__test_data.take(round(self.len_data * 2 * test_size))
        self.__test_data = self.__test_data.batch(16)
        self.__test_data = self.__test_data.prefetch(8)
    
    def __preprocess_data(self, shuffle, test_size=0.2, **kwargs):
        """Preprocess data.
        
        Resizes the image to 105x105 pixels.
        Scales pixels of the image between 0 and 1.
        
        Parameters
        ----------
        shuffle: bool
            Whether or not to shuffle dataset.
        test_size: float
            The fraction of test size in respect of inputted data.
        """
        
        time_logger(log_name=f'{self.log_prefix}augmentate_data', log_on=self.to_log)\
        (self.__augmentate_data)(**kwargs)
        
        time_logger(log_name=f'{self.log_prefix}zip_data', log_on=self.to_log)\
        (self.__zip_data)()
        
        time_logger(log_name=f'{self.log_prefix}merge_data', log_on=self.to_log)\
        (self.__merge_data)(shuffle=shuffle)
        
        time_logger(log_name=f'{self.log_prefix}train_test_split', log_on=self.to_log)\
        (self.__train_test_split)(test_size)
        
    def perform_preprocessing(
        self,
        brightness=0,
        contrast=None,
        flip_left_right=False,
        jpeg_quality=None,
        saturation=None,
        shuffle=True
    ):
        """Performs data preprocessing.
        
        Parameters
        ----------
        brightness: float
            Brightness.
        contrast: list[float, float] | tuple(float, float)
            Lower and upper bounds for the random contrast factor. 
        flip_left_right: bool
            Whether to flip image verticaly or not.
        jpeg_quality: list[float, float] | tuple(float, float)
            Minimum and maximum jpeg encodings quality to use. 
        saturation: list[float, float] | tuple(float, float)
            Lower and upper bounds for the random saturation factor. 
        shuffle: bool
            Whether shuffle or not preprocessed data.
        """
        time_logger(log_name=f'{self.log_prefix}process_data', log_on=self.to_log)\
        (self.__preprocess_data)(
            shuffle,
            brightness=0.02,
            contrast=(0.6, 1),
            flip_left_right=True,
            jpeg_quality=(90, 100),
            saturation=(0.9, 1),
        )
    
    def get_train_test_data(self):
        """Get preprocessed train test data.
        
        Returns
        -------
        train_data: tf.data.Dataset
            Train data split.
        test_data: tf.data.Dataset
            Test data split.
        """
        assert self.__train_data is not None and \
               self.__test_data is not None, \
        "Run train_test_split before requesting train test data!"
        
        return (self.__train_data, self.__test_data)
    
    def get_data(self):
        """Get preprocessed or not data.
        
        Returns
        -------
        data: list | tf.data.Dataset
            Whether preprocessed or not data.
        """
        if self.__preprocessed_data is not None:
            return self.__preprocessed_data
        return data

In [None]:
class DistanceL1(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, valid_embedding):
        return tf.math.abs(input_embedding - valid_embedding)

In [None]:
class SiameseNN(Model):
    """CNN Siamese model for face recognition.
    """
    def __init__(self, **kwargs):
        super().__init__()
        self.input_        = Input(shape=(*IMAGE_SIZE, 3), name='input')
        self.conv2d_1      = Conv2D(filters=64, kernel_size=(10, 10), activation='relu')
        self.max_pooling_1 = MaxPool2D(pool_size=(2, 2), padding='same')
        self.conv2d_2      = Conv2D(filters=128, kernel_size=(7, 7), activation='relu')
        self.max_pooling_2 = MaxPool2D(pool_size=(2, 2), padding='same')
        self.conv2d_3      = Conv2D(filters=128, kernel_size=(4, 4), activation='relu')
        self.max_pooling_3 = MaxPool2D(pool_size=(2, 2), padding='same')
        self.conv2d_4      = Conv2D(filters=256, kernel_size=(4, 4), activation='relu')
        self.flatten_      = Flatten()
        self.dense_        = Dense(units=4096, activation='sigmoid')
        
        self.own_embedding = None
    
    def make_embedding(self, name=''):
        """Embeds pipeline for a pair of images.
        
        Parameters
        ----------
        name: str
            Name of the embedded pipeline.
        """
        x = self.conv2d_1(self.input_)
        x = self.max_pooling_1(x)
        x = self.conv2d_2(x)
        x = self.max_pooling_2(x)
        x = self.conv2d_3(x)
        x = self.max_pooling_3(x)
        x = self.conv2d_4(x)
        x = self.flatten_(x)
        x = self.dense_(x)
        
        self.own_embedding = Model(inputs=self.input_, outputs=x, name=name)
    
    def call(self, other):
        assert self.own_embedding is not None, f"{self.__class__} must create own embedding through 'make_embedding' to create model!"
        assert other.own_embedding is not None, f"{other.__class__} must create own embedding through 'make_embedding' to create model!"
        
        real_input  = Input(name='real_input', shape=IMAGE_SIZE)
        valid_input = Input(name='valid_input', shape=IMAGE_SIZE)
        distance_l1 = DistanceL1()
        print(self.own_embedding)
        distance_l1 = distance_l1(self.own_embedding, other.own_embedding)
        
        model = Dense(units=1, activation='sigmoid')(distance_l1)
        
        return Model(inputs=[real_input, valid_input], outputs=model, name='SiameseNN')
    
    def get_instance(self):
        return self