In [None]:
from ultralytics import YOLO
import cv2
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import torch 

In [None]:
def clear_folder(folder_path):
    # Check if the folder exists
    if os.path.exists(folder_path):
        # Iterate through all the files and subdirectories in the folder
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            # If it's a file, remove it
            if os.path.isfile(item_path):
                os.unlink(item_path)
            # If it's a directory, remove it and its contents
            elif os.path.isdir(item_path):
                shutil.rmtree(item_path)

        print("Cleared contents of folder:", folder_path)
    else:
        print("Folder not found:", folder_path)

In [None]:
def crop_2pen_mid(image):
    ###find middle point
    img = cv2.imread(image)
    h,w,_ = img.shape
    midpoint = int(w/2)
    left_img = img[0:h, 0:midpoint]
    right_img = img[0:h, midpoint:w]
    return left_img,right_img

In [None]:
def three_min_sample_crop(video, output_path, time_min = 3):
    
    ###path for samples
    sample_path = os.path.join(output_path,'sample')
    if os.path.exists(sample_path):
        clear_folder(sample_path)
    else:
        os.makedirs(sample_path)
        
        
    ###sample img by time imterval
    vid = cv2.VideoCapture(video)
    fps = vid.get(cv2.CAP_PROP_FPS)
    total_frame = vid.get(cv2.CAP_PROP_FRAME_COUNT)
    vid_len_min =  total_frame/fps/60
    
    print('Video FPS:','%.2f'%fps)
    print('Video length(min):','%.2f'%vid_len_min)
    print('expect image:','%.0f'%(vid_len_min/time_min))
    #os.chdir(output_path) 
    time_step = time_min*60
    frame_step = int(round(fps*time_step,0))
    
    frame_range = range(0,int(total_frame),frame_step-1)
    
    counter = 0
    for frame in frame_range:
        vid.set(cv2.CAP_PROP_POS_FRAMES, frame)
        
        success, frame_img = vid.read()
        if success:
            cv2.imwrite(os.path.join(sample_path,f'{frame}.jpg'),frame_img)
            print(f'extract frame {frame} and save in {sample_path}')
        counter += 1
    
    vid.release()
                  
    ####img cropping
    r_crop_path = os.path.join(output_path,'r_crop')
    if os.path.exists(r_crop_path):
        clear_folder(r_crop_path)
    else:
        os.makedirs(r_crop_path)
        
    l_crop_path = os.path.join(output_path,'l_crop')
    if os.path.exists(l_crop_path):
        clear_folder(l_crop_path)
    else:
        os.makedirs(l_crop_path)
    
    
    for pic in os.listdir(sample_path):
        left,right = crop_2pen_mid(os.path.join(sample_path,pic))
        cv2.imwrite(os.path.join(l_crop_path,pic),left)
        cv2.imwrite(os.path.join(r_crop_path,pic),right)
        print('crop ', pic)
    print(f'extract {video_file} with crop to {counter} images')

In [None]:
##gather image from video every 10 sec

three_min_sample_crop(video_file, output_path, time_min = 6**-1)

In [None]:
def PC_count(img_path, model, peroid = 10):
    ####function for couting postural change every peroid image
    ######Expect image in dir ordered by int
    sample_path = os.path.join(img_path,'sample')
    lcrop_path = os.path.join(img_path,'l_crop')
    rcrop_path = os.path.join(img_path,'r_crop')
    img_list = os.listdir(sample_path)
    sort_list = sorted(img_list, key=lambda i: int(os.path.splitext(os.path.basename(i))[0]))
    
    current_dir = os.getcwd()
    
    
    l_posture = []
    os.chdir(lcrop_path)
    for img in sort_list:
        l_results = model.predict(img, max_det = 1,stream=True)
        for posture in l_results:
            if len(posture.boxes.cpu().numpy().data) == 0:
                l_posture.append('nan')
            else:
                clf = posture.boxes.cpu().numpy().data[0][5]
                l_posture.append(clf)
        
    
    
    r_posture = []
    os.chdir(rcrop_path)
    for img in sort_list: 
        r_results = model.predict(img, max_det = 1,stream=True)
        for posture in r_results:
            if len(posture.boxes.cpu().numpy().data) == 0:
                r_posture.append('nan')
            else:
                clf = posture.boxes.cpu().numpy().data[0][5]
                r_posture.append(clf)
    
    
    
    l_PC_list = []
    r_PC_list = []
    
    l_PC = 0
    r_PC = 0
    
    counter = 1
    
    peroid = peroid + 1
    
    while counter < len(l_posture):
        if l_posture[counter-1] == 'nan':
            print(f"left stall {counter-1} not detect")
            
            if counter %  peroid == 0:
                l_PC_list.append(r_PC)
            else:
                None
            counter += 1
        
        else:
            if counter %  peroid == 0:
                if l_posture[counter-1] != l_posture[counter]:
                    l_PC += 1
                else:
                    None
                l_PC_list.append(l_PC)
                l_PC = 0
                counter += 1
            else:
                if l_posture[counter-1] != l_posture[counter]:
                    l_PC += 1
                else:
                    None
                counter += 1
        
        
    
    counter = 1
    while counter < len(r_posture):
        if r_posture[counter-1] == 'nan':
            print(f"right stall {counter-1} not detect")
            if counter %  peroid == 0:
                r_PC_list.append(r_PC)
            else:
                None
            counter += 1
        
        else:
            if counter %  peroid == 0:
                if r_posture[counter-1] != r_posture[counter]:
                    r_PC += 1
                else:
                    None
                r_PC_list.append(r_PC)
                r_PC = 0
                counter += 1
            else:
                if r_posture[counter-1] != r_posture[counter]:
                    r_PC += 1
                else:
                    None
                counter += 1
        
    item = int(counter/peroid)
    
    os.chdir(current_dir)
    torch.cuda.empty_cache()
    
    return l_PC_list, r_PC_list, item
        
    
    

In [None]:
####example batch detection
vid_path = [r"D:\PigAbModel\nom_vid\1910\Camera 01_S20231019123300_E20231019140000.mp4",
            r"D:\PigAbModel\nom_vid\1910\Camera 01_S20231019140002_E20231019144817.mp4",
            r"D:\PigAbModel\nom_vid\1910\Camera 01_S20231019144802_E20231019163051.mp4",
            r"D:\PigAbModel\nom_vid\1910\Camera 01_S20231019060003_E20231019092754.mp4",
            r"D:\PigAbModel\nom_vid\1910\Camera 01_S20231019100003_E20231019123321.mp4"
           ]
csv_name = ['pd1910_3.csv', 
            'pd1910_4.csv', 
            'pd1910_5.csv',
            'pd1910_1.csv', 
            'pd1910_2.csv',
           ]


for j,k in zip(vid_path,csv_name):
    model = YOLO("Posturemodel/runs/detect/1203242/weights/best.pt")
    video_file = j
    output_path = "D:\\PigAbModel\\nom_vid\\test_vid_extract"
    
    three_min_sample_crop(video_file, output_path, time_min = 6**-1)
    
    # 5min PC
    l_PC_list, r_PC_list,item = PC_count(img_path = output_path, model = model, peroid = 30)
    time_axis = range(item)
    
    os.chdir('D:\\OneDrive - Mahidol University\\HSP_project\\preparedDS')
    df = pd.DataFrame(list(zip(time_axis,l_PC_list)), columns = ['time','L_PC'])
    df.to_csv(k)