In [1]:
import pandas as pd
import numpy as np
import os
import cv2
import lmdb

In [2]:
class Preprocess():

    
    def __init__(
        self, 
        input_path=None, 
        output_path=None,
        tvt=None,
        crop=True,
        rename=True,
        create_lmdb=True,
        get_gt=True
    ):

        self.input_path = input_path
        self.output_path = output_path
        self.tvt = tvt

        self.create_dirs()
        self.get_info()
        
        if crop:
            self.crop()
            
        if rename:
            self.rename()
            
        if create_lmdb:
            self.create_lmdb()
            
        if get_gt:
            self.get_gt()
    
    
    def create_dirs(self):
        
        try:
            os.makedirs(f'{self.output_path}/{self.tvt}/lmdb')
            
        except:
            pass
    
    
    def get_info(self):

        self.df_annotations = pd.read_csv(
            f'{self.input_path}/{self.tvt}/_annotations.csv', 
            header=None, 
            names=['filename', 'x', 'y', 'x2', 'y2', 'label']
            )

        self.dimensions = []

        for i in range(self.df_annotations.shape[0]):
            self.dimensions.append(list(self.df_annotations.iloc[i, 1:5].values))

        self.labels = list(self.df_annotations.label.values)

        self.filenames = list(self.df_annotations.filename.values)

        self.image_paths = [f'{self.input_path}/{self.tvt}/{filename}' for filename in self.filenames]

        
    def crop_image(self, image, box):
    
        self.image = image
        self.box = box

        self.x, self.y, self.x2, self.y2 = self.box
        
        self.cropped_image = self.image[self.y:self.y2, self.x:self.x2]
        
        return self.cropped_image

    
    def crop(self):

        # Iterate over the predicted bounding boxes and crop the image
        for image_path, dmns, label, filename in zip(
            self.image_paths, 
            self.dimensions, 
            self.labels, 
            self.filenames):

            self.cropped_image = self.crop_image(cv2.imread(image_path), dmns)

            # Save the cropped image with the corresponding label as the filename
            cv2.imwrite(f'{self.output_path}/{self.tvt}/{filename}_{dmns[-1]}_{label}.jpg', self.cropped_image)
            
            
    def rename(self):

        self.paths = os.listdir(f'{self.output_path}/{self.tvt}')

        for i in range(len(self.paths)):

            if self.paths[i].endswith('.jpg'):

                self.new_filename = self.paths[i].split('_')[0] + \
                '_' + \
                self.paths[i].split('_')[-2] + \
                '_' + \
                '_'.join(self.paths[i].split('_')[-1].split(' '))

                os.rename(
                    os.path.join(f'{self.output_path}/{self.tvt}/', self.paths[i]), 
                    os.path.join(f'{self.output_path}/{self.tvt}/', self.new_filename)
                )
                
                
    def create_lmdb(self):

        # Open the LMDB environment
        self.env = lmdb.open(f'{self.output_path}/{self.tvt}/lmdb/', map_size=int(1e9))

        # Start a write transaction
        with self.env.begin(write=True) as self.txn:

            for image_path in [
                f'{self.output_path}/{self.tvt}/' + \
                x for x in os.listdir(f'{self.output_path}/{self.tvt}')
            ]:

                try:

                    # Read the image file
                    image = cv2.imread(image_path)

                    if image is None and image_path.endswith('.jpg'):
                        raise ValueError(f"Failed to read image: {image_path}")

                    # Convert the image to bytes
                    image_bytes = cv2.imencode('.jpg', image)[1].tobytes()

                    # Generate a unique key for the image
                    key = image_path.encode('ascii')

                    # Add the key-value pair to the LMDB database
                    self.txn.put(key, image_bytes)

                except Exception as e:
                    
                    if image_path.endswith('.jpg'):
                    
                        print(f"Error processing image: {image_path}")
                        print(f"Error message: {str(e)}")

        # Close the LMDB environment
        self.env.close()
        
        
    def get_gt(self):

        self.path = f'{self.output_path}/{self.tvt}/'

        self.info = {f'{self.path}{i}': ' '.join(i.split('_')[2:]).split('.')[0] for i in os.listdir(self.path)}

        self.output_file = f'{self.output_path}/gt.txt'

        try:

            with open(self.output_file, 'a') as file:

                for key, value in self.info.items():

                    file.write(f'{key}\t{value}\n')

        except FileNotFoundError:

            with open(self.output_file, 'w') as file:

                for key, value in info.items():

                    file.write(f'{key}\t{value}\n')

In [3]:
def read_lmdb(lmdb_path=None):

    # Open the LMDB environment
    env = lmdb.open(lmdb_path, readonly=True)

    count = 0

    # Start a read transaction
    with env.begin() as txn:

        # Open the LMDB database
        db = txn.cursor()

        # Iterate over the database and print the keys and values
        for key, value in db:

            try:

                if count <= 1:

                    key_str = key.decode('ascii')
                    val_hex = value.hex()

                    print(f'Key: {key_str}')
                    print(f'Value: {val_hex[:100]}...')

            except UnicodeDecodeError:

                print('Error decoding key.')

            count += 1

        print(count)

    # Close the LMDB environment
    env.close()

In [4]:
tvt = ['train', 'valid', 'test']

In [5]:
input_path = '../data/spelling_dictation/5_annotated_htr'
output_path = '../data/spelling_dictation/6_cropped'

In [6]:
preprocess = False
check_lmdb = True

In [7]:
for i in tvt:
    
    if preprocess:
        
        Preprocess(
            input_path=input_path,
            output_path=output_path,
            tvt=i,
        )
        
    if check_lmdb:
        
        read_lmdb(lmdb_path=f'{output_path}/{i}/lmdb')
        print()

Key: ../data/spelling_dictation/6_cropped/train/001_1245_worried.jpg
Value: ffd8ffe000104a46494600010100000100010000ffdb00430002010101010102010101020202020204030202020205040403...
Key: ../data/spelling_dictation/6_cropped/train/001_1400_haul.jpg
Value: ffd8ffe000104a46494600010100000100010000ffdb00430002010101010102010101020202020204030202020205040403...
1765

Key: ../data/spelling_dictation/6_cropped/valid/003_1173_witnessed.jpg
Value: ffd8ffe000104a46494600010100000100010000ffdb00430002010101010102010101020202020204030202020205040403...
Key: ../data/spelling_dictation/6_cropped/valid/003_1497_pevemant.jpg
Value: ffd8ffe000104a46494600010100000100010000ffdb00430002010101010102010101020202020204030202020205040403...
503

Key: ../data/spelling_dictation/6_cropped/test/008_1151_s_rface.jpg
Value: ffd8ffe000104a46494600010100000100010000ffdb00430002010101010102010101020202020204030202020205040403...
Key: ../data/spelling_dictation/6_cropped/test/008_1291_tare_ais.jpg
Value: ffd8ffe000104a