In [None]:
from PIL import Image, ImageChops, ImageColor
import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv
import math

In [None]:
class ContourModel:
    def __init__(self, x = 0, y = 0, w = 0, h = 0):
        self.x = x
        self.y = y
        self.w = w
        self.h = h 

Поиск искр от сварки на кадре

In [None]:
def detect_for_color(frame, lower, upper) -> ContourModel | None:    
    hsv_frame = cv.cvtColor(frame, cv.COLOR_BGR2HSV)


    mask = cv.inRange(hsv_frame, lower, upper)
    target = cv.bitwise_and(hsv_frame, hsv_frame, mask=mask)
    blur = cv.GaussianBlur(mask, (0, 0), sigmaX=10, sigmaY=10)
    
    ret, thresh = cv.threshold(blur, 40, 255, 0)
    
    cont, hier = cv.findContours(thresh, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE)
    
    if len(cont) != 0:
        c = max(cont, key=cv.contourArea)
        cv.drawContours(target, c, -1, 255, 3)
        x, y, w, h = cv.boundingRect(c)
        return ContourModel(x, y, w, h)

    return None
    

Контуры движущихся предметов

In [None]:
def detect_movements(frame1, frame2) -> (cv.Mat, list[ContourModel]):
    bodies = []
    
    diff = cv.absdiff(frame1, frame2)
    diff_gray = cv.cvtColor(diff, cv.COLOR_BGR2GRAY)
    blur = cv.GaussianBlur(diff_gray, (0, 0), sigmaX=3, sigmaY=3)

    _, thresh = cv.threshold(blur, 25, 255, 0)
    dilated = cv.dilate(thresh, None, iterations=20)   


    conts, _ = cv.findContours(dilated, cv.RETR_TREE, cv.CHAIN_APPROX_SIMPLE)  

    for cont in conts:
        x, y, w, h = cv.boundingRect(cont)
        area = cv.contourArea(cont)
        if area < 40000 and area > 3000:
            bodies.append(ContourModel(x, y, w, h))
               
    return (diff, bodies)

In [None]:
class ParsedFrame:
    def __init__(self, is_sparks : bool, is_humans : bool):
        self.is_sparks = is_sparks
        self.is_humans = is_humans

In [None]:
# Выявляет наличие людей в двух кадрах, наличие искр
def analyze_two_frames(frame1 : cv.Mat, frame2 : cv.Mat) -> ParsedFrame:
    is_sparks = False
    is_humans = False

    frame1 = cv.resize(frame1, (int(frame1.shape[1] * 0.7), int(frame1.shape[0] * 0.7)))
    frame2 = cv.resize(frame2, (int(frame2.shape[1] * 0.7), int(frame2.shape[0] * 0.7)))

    #movements[0] - результат "абсолютного вычитания" двух кадров
    #movements[1] - список ContourModel, координаты прямоугольника с человеком
    movements = detect_movements(frame1, frame2)
    sides = [0, 0, frame1.shape[0], frame1.shape[1]]

    if len(movements[1]):
        is_humans = True

    for cont in movements[1]:
        containers = [cont.x, cont.y, cont.x + cont.w, cont.y + cont.h]

        # Жадно увеличиваем область вокруг каждого найденного человека, чтобы увеличить вероятность нахождения искр вокруг него
        for i, side in enumerate(sides):
            if side and i in [0, 1] and containers[i] * 0.9 >= side:
                containers[i] = int(0.9 * containers[i])
            elif containers[i] * 1.1 <= side:
                containers[i] = int(1.1 * containers[i])

        # Два разных метода получения контуров искр. Для болгарки используется динамический метод(основывается на 
        # "абсолютном вычитании" двух кадров + цвет), для сварки статический (на основе цвета)

        #movements[0] - результат "абсолютного вычитания" двух кадров  
        cropped_bolg = movements[0][containers[1] : containers[-1],
                         containers[0] : containers[2]]
        cropped_svar = frame1[containers[1]: containers[-1],
                              containers[0]: containers[2]]
        
        # Цвет искр болгарки и от сварки отличаются, сварка более синяя, в то время как болгарка желто-белая
        svar_spark = detect_for_color(
            cropped_svar, np.array([85, 125, 180]), np.array([140, 225, 255]))
        bolg_spark = detect_for_color(
            cropped_bolg, np.array([50, 0, 135]), np.array([180, 255, 255]))
        
        
        if svar_spark or bolg_spark:
            is_sparks = True
        
    return ParsedFrame(is_sparks, is_humans)

In [None]:
class States:
    Useful = 1
    Forced = 2
    Stand = 3

Полезная - наличие искр

Вынужденная - наличие людей, без искр

Простой - отсутсвие людей

In [None]:
video_path = './learn_video.mp4'
exclude_times = [ (7200, 8100), (14400, 16200), (25200, 26100), (36000, 36900)]

In [None]:
class EvalModel:
    def __init__(self, time : int, state : int):
        self.time = time
        self.state = state

In [None]:
# Препроцессинг видео

cap = cv.VideoCapture(video_path)

prev_state = States.Forced
last_good = 0

DELAY = 30
times : list[EvalModel] = []
bad_temp : list[EvalModel] = []
while True:
    ret1, frame1 = cap.read()
    ret2, frame2 = cap.read()
    if not (ret1 and ret2):
        break

    # Алгоритм определения задержки рабочего на 30 секунд. При увеличении константы, уменьшается шанс простоя
    curr_time = cap.get(cv.CAP_PROP_POS_MSEC) // 1000

    for exc in exclude_times:
        if curr_time >= exc[0] and curr_time <= exc[1]:
            continue

    data = analyze_two_frames(frame1=frame1,
                              frame2=frame2)

    if data.is_sparks:
        if curr_time - last_good >= DELAY:
            bad_temp = [EvalModel(eval_m.time, States.Stand) for eval_m in bad_temp]
        times.extend(bad_temp)
        bad_temp.clear()

        prev_state = States.Useful
        times.append(EvalModel(curr_time, prev_state))

        last_good = curr_time

    elif data.is_humans and not data.is_sparks:
        if curr_time - last_good >= 30:
            bad_temp = [EvalModel(eval_m.time, States.Stand)
                        for eval_m in bad_temp]
            
        times.extend(bad_temp)
        bad_temp.clear()

        prev_state = States.Forced
        times.append(EvalModel(curr_time, prev_state))

        last_good = curr_time
    else:
        bad_temp.append(EvalModel(curr_time, States.Stand))                


In [None]:
# Обработка выходных данных. Слияние кадров n-ой секунды в один кадр, в зависимости от медианы состояний
st = [[i.state, int(i.time)] for i in times]

prev_result = []
temp = []
class_t = 0
j = 0
for j in range(len(st)):
    if st[j][1] != class_t:
        prev_result.append(temp)
        temp = []

        class_t = st[j][1]
    temp.append(st[j])

In [None]:
result = []

# Константа влияющая на точность определения полезной работы, Много и мало плохо
USEFUL_CONST = 3
for stamp in prev_result:
    in_line = [i[0] for i in stamp]
    if in_line.count(1) > USEFUL_CONST:
        result.append([1, stamp[0][1]])
    else:
        mean = int(np.mean(in_line))
        result.append([mean, stamp[0][1]])
        

result = np.array(result)

In [None]:
inline = [i[0] for i in result]
useful = inline.count(1)
force = inline.count(2)
stand = inline.count(3)

fig, ax = plt.subplots()

states = ['Useful', 'Force', 'Stand']
counts = [useful / len(inline) * 100, force / len(inline) * 100, stand / len(inline) * 100  ]


ax.set_ylabel('Count of states (in percent)')
ax.set_xlabel('States')
ax.bar(states, counts)
plt.show()


Утилиты

In [None]:
cap = cv.VideoCapture('../examples/learn_video.mp4')

cnt = 0
modes = [ord('q'), ord('s'), ord('n')]
while True:
    ret, img = cap.read()

    cv.imshow('asd', img)

    inp = cv.waitKey(1) & 0xFF
    while inp not in modes:
        inp = cv.waitKey(1) & 0xFF

    if inp == ord('s'):
        cv.imwrite(f'./tests/{cnt}.jpg', img)
        cnt += 1
    if inp == ord('q'):
        break

cv.waitKey(0)
cv.destroyAllWindows()

In [None]:
img = cv.imread('./tests/01_out.jpg')
detect_for_color(img, np.array([60, 0, 100]), np.array([180, 255, 255]))

cv.waitKey(0)
cv.destroyAllWindows()

In [None]:
import cv2
import numpy as np


def nothing(x):
    pass


# Load image
image = cv2.imread('./tests/01_out.jpg')

# Create a window
cv2.namedWindow('image')

# Create trackbars for color change
# Hue is from 0-179 for Opencv
cv2.createTrackbar('HMin', 'image', 0, 179, nothing)
cv2.createTrackbar('SMin', 'image', 0, 255, nothing)
cv2.createTrackbar('VMin', 'image', 0, 255, nothing)
cv2.createTrackbar('HMax', 'image', 0, 179, nothing)
cv2.createTrackbar('SMax', 'image', 0, 255, nothing)
cv2.createTrackbar('VMax', 'image', 0, 255, nothing)

# Set default value for Max HSV trackbars
cv2.setTrackbarPos('HMax', 'image', 179)
cv2.setTrackbarPos('SMax', 'image', 255)
cv2.setTrackbarPos('VMax', 'image', 255)

# Initialize HSV min/max values
hMin = sMin = vMin = hMax = sMax = vMax = 0
phMin = psMin = pvMin = phMax = psMax = pvMax = 0

while (1):
    # Get current positions of all trackbars
    hMin = cv2.getTrackbarPos('HMin', 'image')
    sMin = cv2.getTrackbarPos('SMin', 'image')
    vMin = cv2.getTrackbarPos('VMin', 'image')
    hMax = cv2.getTrackbarPos('HMax', 'image')
    sMax = cv2.getTrackbarPos('SMax', 'image')
    vMax = cv2.getTrackbarPos('VMax', 'image')

    # Set minimum and maximum HSV values to display
    lower = np.array([hMin, sMin, vMin])
    upper = np.array([hMax, sMax, vMax])

    # Convert to HSV format and color threshold
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv, lower, upper)
    prev_result = cv2.bitwise_and(image, image, mask=mask)

    # Print if there is a change in HSV value
    if ((phMin != hMin) | (psMin != sMin) | (pvMin != vMin) | (phMax != hMax) | (psMax != sMax) | (pvMax != vMax)):
        print("(hMin = %d , sMin = %d, vMin = %d), (hMax = %d , sMax = %d, vMax = %d)" % (
            hMin, sMin, vMin, hMax, sMax, vMax))
        phMin = hMin
        psMin = sMin
        pvMin = vMin
        phMax = hMax
        psMax = sMax
        pvMax = vMax

    # Display result image
    cv2.imshow('image', prev_result)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cv2.destroyAllWindows()