In [None]:
import os
import time
import tensorflow as tf
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
from object_detection.utils import config_util
import cv2 
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline

from os import listdir
from os.path import isfile, join

from lxml import etree

Getting latest model from models folder:

In [None]:
model_path = os.path.join('Tensorflow','workspace','my_models')
models_names = [name for name in os.listdir(model_path) if os.path.isdir(model_path)]
models_names = [os.path.join(model_path,f) for f in models_names]
print('MODELS BEFORE SORT: ',models_names)
models_names.sort(key=lambda x: os.path.getmtime(x))
print('MODELS AFTER SORT: ',models_names)

latest_model_name = os.path.basename(os.path.normpath(models_names[len(models_names)-1]))
print("MODEL: ",latest_model_name)

In [None]:
img_folder = 'test' #images folder with adnotation files to detect
CUSTOM_MODEL_NAME = latest_model_name
LABEL_MAP_NAME = 'label_map.pbtxt'

In [None]:
paths = {
    'ANNOTATION_PATH': os.path.join('Tensorflow', 'workspace','my_models',CUSTOM_MODEL_NAME,'dev','annotations'),
    'IMAGE_PATH': os.path.join('Tensorflow', 'workspace','images'),
    'CHECKPOINT_PATH': os.path.join('Tensorflow', 'workspace','my_models',CUSTOM_MODEL_NAME) 
 }

In [None]:
files = {
    'PIPELINE_CONFIG':os.path.join('Tensorflow', 'workspace','my_models', CUSTOM_MODEL_NAME, 'pipeline.config'),
    'LABELMAP': os.path.join(paths['ANNOTATION_PATH'], LABEL_MAP_NAME)
}


Getting latest checkpoint from models folder:

In [None]:
file_names_in_model_dir = os.path.join('Tensorflow','workspace','my_models',CUSTOM_MODEL_NAME)
ckpt_files = []
all_files_from_model_dir = [f for f in listdir(file_names_in_model_dir) if isfile(join(file_names_in_model_dir, f))] #lista nazw plikow
for i in all_files_from_model_dir :
    if i[-1]=='x': (ckpt_files.append(i))
print(ckpt_files)
numbers = []
temp_num = ""
# extract number of ckpt files:
for one_name in ckpt_files:
    for c in one_name:
        if c.isdigit(): # finding numbers in string
            temp_num = temp_num + c
    numbers.append(int(temp_num))
    temp_num=""
    max_num = max(numbers)#largest number from list
latest_checkpoint_name = "ckpt-"+str(max_num)  #name of latest checkpoint
print(CUSTOM_MODEL_NAME,'---------->',latest_checkpoint_name)

In [None]:
# Load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(files['PIPELINE_CONFIG'])
detection_model = model_builder.build(model_config=configs['model'], is_training=False)

# Restore checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(paths['CHECKPOINT_PATH'],latest_checkpoint_name)).expect_partial()

@tf.function
def detect_fn(image):
    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)
    return detections

In [None]:
category_index = label_map_util.create_category_index_from_labelmap(files['LABELMAP'])

In [None]:
TEST_PATH = os.path.join(paths['CHECKPOINT_PATH'],'dev','img',img_folder)
RIGHT_DETECTION_FILE_NAME =[]
BAD_DETECTION_FILE_NAME =[]
detection_status=""

files = [f for f in listdir(TEST_PATH) if isfile(join(TEST_PATH, f))] #lista nazw plikow
files.sort()
jpg_images = [] #list for image names
for i in files:
    if i[-1]=='g':
        jpg_images.append(i)


for image_name in jpg_images:
    IMAGE_PATH = os.path.join(TEST_PATH, image_name)
    
    img = cv2.imread(IMAGE_PATH)
    image_np = np.array(img)

    input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
    detections = detect_fn(input_tensor)

    num_detections = int(detections.pop('num_detections'))
    detections = {key: value[0, :num_detections].numpy()
                  for key, value in detections.items()}
    detections['num_detections'] = num_detections

    # detection_classes should be ints.
    detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

    label_id_offset = 1
    image_np_with_detections = image_np.copy()

    viz_utils.visualize_boxes_and_labels_on_image_array(
                image_np_with_detections,
                detections['detection_boxes'],
                detections['detection_classes']+label_id_offset,
                detections['detection_scores'],
                category_index,
                use_normalized_coordinates=True,
                max_boxes_to_draw=1,
                min_score_thresh=.1,
                agnostic_mode=False)
    
    #-----checking if detection is legit
#     image_name.rstrip('.jpg')
    do_nazwy = category_index[detections['detection_classes'][0]+label_id_offset] #wydobycie danej sekcji z nazwa
    finded_object_name = do_nazwy['name']

    XML_PATH = os.path.join(paths['CHECKPOINT_PATH'],'dev','img',img_folder,image_name.rstrip('jpg')+"xml")
    root = etree.parse(XML_PATH) #open xml file
    myroot = root.getroot() 
    label_name_from_xml = (myroot.find('.//name')).text #getting name from xml file
    if finded_object_name == label_name_from_xml:
        RIGHT_DETECTION_FILE_NAME.append(image_name)
        detection_status="YES"
    else:
        BAD_DETECTION_FILE_NAME.append(image_name)
        detection_status="NO"
    #-----checking if detection is legit
    
    
    
    print(image_name,"|","class:",detections['detection_classes'][0],"| detection:",detections['detection_scores'][0],"|",finded_object_name,"|",detection_status)
    print(detections['detection_boxes'][0])
    
    plt.imshow(cv2.cvtColor(image_np_with_detections, cv2.COLOR_BGR2RGB))
    plt.show()
print(f"Correctly detected: {len(RIGHT_DETECTION_FILE_NAME)}")
print(RIGHT_DETECTION_FILE_NAME)
print(f"Incorrectly detected: {len(BAD_DETECTION_FILE_NAME)}")
print(BAD_DETECTION_FILE_NAME)
print("Performance: ",(len(RIGHT_DETECTION_FILE_NAME)/len(jpg_images))*100,"%" )

    


# Real Time Detections from Webcam

In [None]:
# cap = cv2.VideoCapture(0)
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# while cap.isOpened(): 
#     ret, frame = cap.read()
#     image_np = np.array(frame)
    
#     input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
#     detections = detect_fn(input_tensor)
    
#     num_detections = int(detections.pop('num_detections'))
#     detections = {key: value[0, :num_detections].numpy()
#                   for key, value in detections.items()}
#     detections['num_detections'] = num_detections

#     # detection_classes should be ints.
#     detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

#     label_id_offset = 1
#     image_np_with_detections = image_np.copy()

#     viz_utils.visualize_boxes_and_labels_on_image_array(
#                 image_np_with_detections,
#                 detections['detection_boxes'],
#                 detections['detection_classes']+label_id_offset,
#                 detections['detection_scores'],
#                 category_index,
#                 use_normalized_coordinates=True,
#                 max_boxes_to_draw=1,
#                 min_score_thresh=.3,
#                 agnostic_mode=False)

#     cv2.imshow('object detection',  cv2.resize(image_np_with_detections, (640, 480)))
    
#     if cv2.waitKey(10) & 0xFF == ord('q'):
#         cap.release()
#         cv2.destroyAllWindows()
#         break