In [None]:
import tensorflow as tf
import numpy as np
import os 
import glob
import random
import tensorflow.keras as keras
import keras.layers as layers 

In [None]:
IMG_W = 224
IMG_H = 224
S = 14 # number of grid cells in  the output
D = 5 # depth of each grid cells
NUM_VAL_SAMPLES = 128

# diretório a ser salvo
tfrecords_dir = "tfrecords_detection"

#diretório de labels
LABEL_DIR =  r"../datasetbuilding/labels/*/*" 


annotations = glob.glob(LABEL_DIR)
random.shuffle(annotations)
print("total de imagens: ", len(annotations))

train_annotations = annotations[NUM_VAL_SAMPLES:]
val_annotations = annotations[:NUM_VAL_SAMPLES]
print(train_annotations[0:10])


#num_samples is the number of data samples on each TFRecord file. (TEM Q SER MENORIGUAL DO Q O NUMERO DE IMAGENS)
#num_tfrecords is total number of TFRecords that we will create.
num_samples_train = 8*1024
num_tfrecords_train = len(train_annotations) // num_samples_train
if len(train_annotations) % num_samples_train:
    num_tfrecords_train += 1  # add one record if there are any remaining samples
if len(train_annotations) < num_samples_train:
    print("Erro no tamanho do TFRecord")
    
    
num_samples_val = NUM_VAL_SAMPLES
num_tfrecords_val = len(val_annotations) // num_samples_val
if len(val_annotations) % num_samples_val:
    num_tfrecords_val += 1  # add one record if there are any remaining samples
    

if not os.path.exists(tfrecords_dir+"\\train"):    
    os.makedirs(tfrecords_dir+"\\train")  # creating TFRecords output folder    

if not os.path.exists(tfrecords_dir+"\\val"):
    os.makedirs(tfrecords_dir+"\\val")  # creating TFRecords output folder    
    
print("num_tfrecords for train {}".format(num_tfrecords_train))
print("num_tfrecords for val {}".format(num_tfrecords_val))

In [None]:


def bbox_str_to_float(bbox_str):
    return [float(bbox_str[0]), float(bbox_str[1]), float(bbox_str[2]), float(bbox_str[3]), float(bbox_str[4])]



""" converte centroid relativo a imagem para centroid relativo da grid """
def img_to_grid_relative(bbox, S):
    grid_x = int(bbox[0]*S)
    grid_y = int(bbox[1]*S)

    x_grid_relative = np.around(bbox[0]*S - grid_x, 5)
    y_grid_relative = np.around(bbox[1]*S - grid_y, 5)

    return [x_grid_relative, y_grid_relative]


def labelfile_to_tensor(label_file_name, S, D):
    output_label_map = np.zeros((S, S, D))
    with open(label_file_name, 'r') as label_file:        

        for line in label_file.readlines():                
            bbox_str = line.replace("\n", "").split(" ")
            bbox = bbox_str_to_float(bbox_str) # BBOX CONTAINS OBJECTNESS ON THE FIRST POSITION

            # converte posição relativa para output S,S
            grid_x = int(bbox[1]*S)
            grid_y = int(bbox[2]*S)

            xy_grid_relative = img_to_grid_relative(bbox[1:], S)        
            #print("grid [{},{}], imgrel[{},{}], offset[{},{}] ".format(grid_x,grid_y, bbox[1], bbox[2], xy_grid_relative[0],xy_grid_relative[1]))        
            bbox[1] = xy_grid_relative[0]
            bbox[2] = xy_grid_relative[1]                

            if output_label_map[grid_x, grid_y, 0] == 1:
                print("Ja inserido objeto na célula {},{}".format(grid_x, grid_y))

            output_label_map[grid_x, grid_y] = np.asarray([1]+bbox[1:])

        return output_label_map



def image_feature(value):
    """Returns a bytes_list from a string / byte."""
    return tf.train.Feature(
        bytes_list=tf.train.BytesList(value=[tf.io.encode_png(value).numpy()])
    )

def output_tensor(value):    
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value.encode()]))


def float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))


def int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def float_feature_list(value):
    """Returns a list of float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def create_example(image, path, yolo_boxes, S, D):
    feature = {
        "image": image_feature(image),
        "path": bytes_feature(path),        
        "S": int64_feature(S), # output grid cells
        "D": int64_feature(D), # depth of each grid cell
        "yolo_boxes": float_feature_list(yolo_boxes),                
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


def parse_tfrecord_fn(example):
    feature_description = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "path": tf.io.FixedLenFeature([], tf.string),
        "S": tf.io.FixedLenFeature([], tf.int64), # output grid cells
        "D": tf.io.FixedLenFeature([], tf.int64), # depth of each grid cell
        "yolo_boxes": tf.io.FixedLenFeature([S*S*D], tf.float32), # S*S*D
    }
    example = tf.io.parse_single_example(example, feature_description)
    example["image"] = tf.io.decode_png(example["image"], channels=3)    
    
    return example


In [None]:

LABEL_DIR = LABEL_DIR.replace("/*", "")
for tfrec_num in range(num_tfrecords_train):
    samples = annotations[(tfrec_num * num_samples_train) : ((tfrec_num + 1) * num_samples_train)]

    with tf.io.TFRecordWriter(tfrecords_dir + "/train/file_%.2i-%i.tfrec" % (tfrec_num, len(samples))) as writer:
        for sample in samples:
            obj_class = sample.split("\\")[-2]
            obj_name = sample.split("\\")[-1].replace(".txt", "")
            
            image_path = f"../dataset/imagens/{obj_class}/{obj_name}.png"      
            #print(image_path)
            image = tf.io.decode_png(tf.io.read_file(image_path))
            
            label_file_name = f"{LABEL_DIR}/{obj_class}/{obj_name}.txt"            
            output_label_tensor =  list(labelfile_to_tensor(label_file_name, S, D).ravel().astype('float32'))
                                    
            example = create_example(image, image_path, output_label_tensor, S, D)         
            writer.write(example.SerializeToString())


In [None]:
            
for tfrec_num in range(num_tfrecords_val):
    samples = annotations[(tfrec_num * num_samples_val) : ((tfrec_num + 1) * num_samples_val)]

    with tf.io.TFRecordWriter(tfrecords_dir + "/val/file_%.2i-%i.tfrec" % (tfrec_num, len(samples))) as writer:
        for sample in samples:
            obj_class = sample.split("\\")[-2]
            obj_name = sample.split("\\")[-1].replace(".txt", "")
            
            image_path = f"../dataset/imagens/{obj_class}/{obj_name}.png"            
            image = tf.io.decode_png(tf.io.read_file(image_path))
            
            label_file_name = f"{LABEL_DIR}/{obj_class}/{obj_name}.txt"            
            output_label_tensor =  list(labelfile_to_tensor(label_file_name, S, D).ravel().astype('float32'))
                                    
            example = create_example(image, image_path, output_label_tensor, S, D)         
            writer.write(example.SerializeToString())          