In [91]:
import cv2
from os import listdir
from os.path import isfile, join
from ultralytics import YOLO
import torch
import base64

In [None]:
# IMAGE CROP

# PATH = './train/tree/'

# onlyfiles = [f for f in listdir(PATH) if isfile(join(PATH, f))]

# for path in onlyfiles:
#     cap = cv2.VideoCapture(f'{PATH}/{path}')
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     frame_number = 127 * fps 
#     cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number-1)
#     res, frame = cap.read()
#     img_path = f"images/Tree//{path[:-4]}_tree.jpg"
#     try:
#         cv2.imwrite(img_path, frame)
#     except:continue

In [109]:
class CDWnet:
    def __init__(self, hard_model, light_model = None):
        self.light_model_path = light_model
        self.hard_model_path = hard_model
        self.cuda_flag = False
        self.detect_model_classes = None
        self.video_path = None

        if torch.cuda.is_available():
            self.cuda_flag = True 
        self.prepare_model()

    def prepare_model(self):
        if self.light_model_path is not None:
            self.light_model = YOLO(self.light_model_path)
            self.detect_model_classes = self.light_model.names
            self.light_model.to('cuda') 

        if self.hard_model_path is not None:
            self.hard_model = YOLO(self.hard_model_path)
            self.detect_model_classes = self.hard_model.names
            self.hard_model.to('cuda') 

    def handle_result(self, result):
        for res in result:
            boxes = res.boxes.cpu().numpy()
            images_data = []

            for box in boxes:
                class_name = self.detect_model_classes[int(box.cls)]
                xyxy = box.xyxy[0]
                confidence = str(round(box.conf[0].item(), 2))
                label = f'{class_name}: {confidence}'

                images_data.append([class_name, confidence, xyxy, label])
        
        most_conf_class = max(images_data, key = lambda x: x[1])
        return most_conf_class

    
    def post_process(self, detection_results):
        vals = list(detection_results.values())
        cls_list = [i[0] for i in vals]

        final_class = max(cls_list, key=cls_list.count)
        cnf_list = [i for i in vals if i[0] == final_class]

        max_conf = max(cnf_list, key = lambda x: x[1])
        frame_num = list(detection_results.keys())[vals.index(max_conf)]


        cap = cv2.VideoCapture(self.video_path)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num-1) # -1?
        success, frame = cap.read()

        frame = plot_boxes(frame, max_conf[2], max_conf[3])

        success, buffer = cv2.imencode('.jpg', frame)
        base64_img = base64.b64encode(buffer)

        return final_class, base64_img


    def process_hard(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_number = 120 * fps # 2:05 - 2:15
        last_frame_number = 135 * fps
        conf = 0.5
        frame_skip = 11

        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number-1)
        detection_results = dict()

        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                break
            frame_id = cap.get(cv2.CAP_PROP_POS_FRAMES)

            if frame_id == last_frame_number:
                break
            result = self.hard_model(frame, verbose=False, conf = conf)
            detection_results[frame_id] = self.handle_result(result)
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id+frame_skip)
        cap.release()
        return detection_results
        
    # def process_light(self):
        # cap = cv2.VideoCapture(data_path)
        # frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))


        # while cap.isOpened():
        #     frames = []
        #     frame_ids = []
        #     timestamps = []
        #     for i in range(self.batch_size):
        #         success, frame = cap.read()
        #         if not success:
        #             break
        #         timestamps.append(cap.get(cv2.CAP_PROP_POS_MSEC))
        #         frame_ids.append(int(cap.get(cv2.CAP_PROP_POS_FRAMES)))
        #         frames.append(frame)
        #     if len(frames) != 0:
        #         frames = self.process_batch(frames, timestamps, frame_ids, save)
        #         if save:
        #             for frame in frames:
        #                 out.write(frame)
        #     if not success:
        #         break

    def predict(self, video_path, mode = 'hard_mode'):
        self.video_path = video_path
        if mode == 'hard_mode':
            result = self.process_hard(video_path)
            return self.post_process(result)
        # elif self.mode == 'light_mode':
        #     self.process_light(video_path)

In [110]:
HARD_MODEL = '../models/yolov8l_e20_b8_im720.pt'
VIDEO_PATH = '../data/3554032.mp4'

model = CDWnet(hard_model=HARD_MODEL)

In [111]:
res_class = model.predict(VIDEO_PATH)
print(res_class)

('Concrete', b'/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAIBAQEBAQIBAQECAgICAgQDAgICAgUEBAMEBgUGBgYFBgYGBwkIBgcJBwYGCAsICQoKCgoKBggLDAsKDAkKCgr/2wBDAQICAgICAgUDAwUKBwYHCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgoKCgr/wAARCALQAtADASIAAhEBAxEB/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/8QAHwEAAwEBAQEBAQEBAQAAAAAAAAECAwQFBgcICQoL/8QAtREAAgECBAQDBAcFBAQAAQJ3AAECAxEEBSExBhJBUQdhcRMiMoEIFEKRobHBCSMzUvAVYnLRChYkNOEl8RcYGRomJygpKjU2Nzg5OkNERUZHSElKU1RVVldYWVpjZGVmZ2hpanN0dXZ3eHl6goOEhYaHiImKkpOUlZaXmJmaoqOkpaanqKmqsrO0tba3uLm6wsPExcbHyMnK0tPU1dbX2Nna4uPk5ebn6Onq8vP09fb3+Pn6/9oADAMBAAIRAxEAPwD+f+il2H1FGw+ooASiiigD9I/iD/wQb+C37H3wT+H/AI8/4Klf8FEf+FA+MPH39q/Y/A3/AAqO68VeR9huERv9N0m9kibdDPaTchcfadg3GNiOG/bM/wCCK5+Cv7FPhv8A4KH/ALHn7SJ+Ofwg1M3g8R+LV

In [5]:
a = [[1, 2],[2, 1]]

a = max(a, key = lambda x: x[1])

In [7]:
a[0]

1

In [1]:
def plot_boxes(frame, xyxy, label):  # plot detected class box
    x1 = int(xyxy[0])
    y1 = int(xyxy[1])
    x2 = int(xyxy[2])
    y2 = int(xyxy[3])

    (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
    frame = cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), (0, 0, 255), -1)
    frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
    frame = cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
    return frame

In [90]:
a = {'1': ['a', 1], '2': ['b', 2], '3':['b', 3]}

def plot_boxes(frame, xyxy, label):  # plot detected class box
    x1 = int(xyxy[0])
    y1 = int(xyxy[1])
    x2 = int(xyxy[2])
    y2 = int(xyxy[3])

    (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
    frame = cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), (0, 0, 255), -1)
    frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
    frame = cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
    return frame


def postprocess(self, detection_results):

    vals = list(detection_results.values())
    cls_list = [i[0] for i in vals]

    final_class = max(cls_temp, key=cls_list.count)
    cnf_list = [i for i in vals if i[0] == final_class]


    max_conf = max(cnf_list, key = lambda x: x[1])
    frame_num = list(detection_results.keys())[vals.index(max_conf)]

    print(final_class, frame_num)

    cap = cv2.VideoCapture(self.video_path)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num-1) # -1?
    success, frame = cap.read()


In [24]:
a

[['a', 1], ['b', 2], ['b', 3]]

In [54]:
temp[:, 0]

array(['a', 'b', 'b'], dtype='<U11')

In [36]:
temp

[['a', 1], ['b', 2], ['b', 3]]