In [16]:
import cv2
import numpy as np
import logging as log
import typing as t
from moviepy.video.fx.resize import resize
from moviepy.video.io.VideoFileClip import VideoFileClip
import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib qt

In [2]:
log.basicConfig(level=log.DEBUG 
                    ,filename="demo.log" 
                    ,filemode="w" 
                    ,format="%(asctime)s - %(name)s - %(levelname)-9s - %(filename)-8s : %(lineno)s line - %(message)s" #日志输出的格式
                    # -8表示占位符，让输出左对齐，输出长度都为8位
                    ,datefmt="%Y-%m-%d %H:%M:%S" 
                    )

log.debug = print
log.info = print
log.warning = print
log.error = print
log.critical = print
# log.debug('This message should go to the log file')
# log.info('So should this')
# log.warning('And this, too')

In [3]:
def show_frame(f: np.ndarray,delay:int = 0) -> None:
    # resized.get_frame(0).shape
    cv2.imshow("frame", f )
    cv2.waitKey(delay)
    if delay == 0:
        cv2.destroyAllWindows()

In [4]:
# 设置
# video_path = "D:\AFSSC\Documents\python学习\视频剪辑项目\手术原始视频 00_00_00-00_15_00.mp4"
video_path = R"D:\AFSSC\Documents\python学习\视频剪辑项目\手术原始视频.mp4"
template_path = R"templates\endoscope.png"
proxy_size = (214, 120)
proxy_filename = "proxy.mp4"
template_threshold = 50000

In [5]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    log.critical("Cannot open video.")
    
log.debug(f"video:{video_path} opened.")

video:D:\AFSSC\Documents\python学习\视频剪辑项目\手术原始视频.mp4 opened.


In [6]:
total_frame = cap.get(cv2.CAP_PROP_FRAME_COUNT)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [7]:
tplt = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
tplt[np.where(tplt == 0 )] = 0
tplt[np.where(tplt == 50 )] = 0
tplt[np.where(tplt == 150 )] = 255
tplt[np.where(tplt == 250 )] = 0
tplt = cv2.resize(tplt,(1280,720))
tplt[np.where(tplt != 255 )] = 0
# show_frame(tplt,0)

In [8]:
sns.heatmap(tplt)

<Axes: >

In [9]:
def search_nearby_frames(cap: cv2.VideoCapture, frame: np.ndarray, threshold: int = 50000) -> t.Tuple[int, np.ndarray]:
    """
    Search nearby frames for the one that is most similar to the given frame.
    """
    min_diff = threshold
    min_frame = None
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    for i in tqdm.tqdm(range(int(total_frame))):
        ret, f = cap.read()
        if not ret:
            break
        diff = cv2.absdiff(frame, f)
        diff = np.sum(diff)
        if diff < min_diff:
            min_diff = diff
            min_frame = f
    return min_diff, min_frame

In [10]:
def masking(f,msk):
    # f = cv2.resize(f, (tplt.shape[1], tplt.shape[0]))
    msk = cv2.cvtColor(msk, cv2.COLOR_GRAY2BGR)
    f = cv2.bitwise_and(f, msk)
    return f

In [56]:
cap.set(cv2.CAP_PROP_POS_FRAMES, 114514)
testframe1  = cap.read()[1]
standard = testframe1
cap.set(cv2.CAP_PROP_POS_FRAMES, 444444)
testframe2  = cap.read()[1]

In [39]:
algo = cv2.SIFT.create()

In [40]:
kp1 , des1 = algo.detectAndCompute(testframe1, tplt)
kp2 , des2 = algo.detectAndCompute(testframe2, tplt)

In [45]:
bf = cv2.BFMatcher()
matches = bf.knnMatch(des1, des2, k=2)
# matches = bf.match(des1)
# matches = sorted(matches, key=lambda val: val.distance)
result = cv2.drawMatchesKnn(testframe1, kp1, testframe2, kp2, matches[:50], None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)

In [55]:
total_distance = 0
count = 0
for m,n in matches:
    total_distance +=m.distance
    count+=1
display(count,len(matches))
display(total_distance/(count if count else 0))

1397

1397

291.36851185803425

In [78]:
# algo = cv2.SIFT.create()
algo = cv2.ORB.create()
matcher = cv2.BFMatcher()
 
kpstd, desstd = algo.detectAndCompute(standard, tplt)
def calculate_knn_distance(src_index:int) -> float:
    global cap
    global tplt
    global algo
    global matcher
    cap.set(cv2.CAP_PROP_POS_FRAMES,src_index)
    src = cap.read()[1]
    kpsrc , dessrc = algo.detectAndCompute(src,tplt)
    matches = matcher.knnMatch(dessrc,desstd,k=2)
    total_distance = 0
    count = 0
    for m,n in matches:
        total_distance +=m.distance
        count+=1
    return total_distance/count if count else 0

    # raise NotImplementedError

In [84]:
from skimage.metrics import structural_similarity as ssim

In [94]:
def calculate_ssim(image1, image2):
    # 将图像转换为灰度图像
    gray_image1 = cv2.cvtColor(image1, cv2.COLOR_BGR2GRAY)
    gray_image2 = cv2.cvtColor(image2, cv2.COLOR_BGR2GRAY)
    # 计算SSIM
    show_frame(gray_image1)
    show_frame(gray_image2)
    score, _ = ssim(gray_image1, gray_image2, full=True)
    return score


In [100]:

similarity_threshold=0.5
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
similar_frames = []

calculate_ssim(testframe1,testframe2)

# for i in tqdm.tqdm(range(114514 - 5000, 114514 + 5000, 50)):
#     cap.set(cv2.CAP_PROP_POS_FRAMES, i)
#     ret, frame = cap.read()
#     if not ret:
#         break

#     for j in range(i + 1, frame_count, 50):
#         cap.set(cv2.CAP_PROP_POS_FRAMES, j)
#         ret, next_frame = cap.read()
#         if not ret:
#             break
#         similarity culate_ssim(frame, next_frame)
#         if similarity > similarity_threshold:
#             similar_frames.append((i, j))

# cap.release()


#
print(similar_frames)

[]


In [102]:
off = 500000
score =  []
for i in tqdm.tqdm(range(114514 - off , 114514 + off ,5000)):
    score.append(calculate_knn_distance(i))

100%|██████████| 200/200 [00:15<00:00, 12.81it/s]


In [105]:
plt.plot(score)

[<matplotlib.lines.Line2D at 0x1b9d4651840>]

In [43]:
# res = cv2.resize(result, (result.shape[1]*4, result.shape[0]*4))
imgmatches = cv2.drawMatchesKnn(testframe1, kp1, testframe2, kp2, goodmatches, None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
show_frame(imgmatches,0)


In [151]:
# show_frame(tplt)

#### cv2 crude

In [7]:
tplt = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
tplt[np.where(tplt == 0 )] = 1
tplt[np.where(tplt == 50 )] = 0
tplt[np.where(tplt == 150 )] = 0
tplt[np.where(tplt == 250 )] = 0

In [8]:
def calculate_match_score(src: np.ndarray,template: np.ndarray,lower_thresh:int) -> int:
    return (template*src).sum() 


In [9]:
def crude_search_preprocess(frame: np.ndarray) -> np.ndarray:
    frame = cv2.resize(frame, proxy_size, None)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    frame = cv2.GaussianBlur(frame,(5,5),0)
    frame = cv2.dilate(frame, None, iterations=2)
    frame = cv2.threshold(frame, 30, None, cv2.THRESH_TOZERO, None)[1]
    frame = frame // 10
    return frame

In [10]:
match_score = [] 
match_frame = []
match_edges = []
tgt_frame_num = 200 
step = int(total_frame//tgt_frame_num)
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
last_judge = False
last_frame = 0

for i in tqdm.tqdm(range(0,int(total_frame),step)):
    ret, frame = cap.read()
    if not ret:
        break
    frame = crude_search_preprocess(frame)
    score = calculate_match_score(frame,tplt,template_threshold)
    match_score.append(score)
    match_frame.append(i - step)
    if last_judge ^ (score < template_threshold):
        match_edges.append((last_frame - step,i - step,True if last_judge else False))
    last_judge = score < template_threshold
    last_frame = i
    cap.set(cv2.CAP_PROP_POS_FRAMES, i)

    # show_frame(frame)


100%|██████████| 201/201 [00:10<00:00, 18.39it/s]


In [11]:
match_edges

[(12204, 18306, False), (189162, 195264, True), (414936, 421038, False)]

In [12]:
def binary_search(start_index: int, end_index: int ,is_upside:bool,cap: cv2.VideoCapture, 
                  template: np.ndarray, template_threshold: int) -> int:
    l = start_index 
    h = end_index
    # print(f"h:{h},l:{l}")
    while(h - l > 1):
        # print(f"h:{h},l:{l}")
        m = (h+l)//2
        cap.set(cv2.CAP_PROP_POS_FRAMES, m)
        ret, frame = cap.read()
        if not ret:
            break
        frame = crude_search_preprocess(frame)
        # show_frame(frame)
        score = calculate_match_score(frame,template,template_threshold)
        log.debug(f"m:{m},score:{score},judge:{score < template_threshold}")
        if is_upside:
            if score > template_threshold:
                h = m
            else:
                l = m
        else:
            if score > template_threshold:
                l = m
            else:
                h = m
    return h

In [13]:
binary_search(*match_edges[0],cap,tplt,template_threshold)

m:15255,score:118266,judge:False
m:16780,score:30057,judge:True
m:16017,score:116560,judge:False
m:16398,score:117192,judge:False
m:16589,score:32098,judge:True
m:16493,score:118072,judge:False
m:16541,score:117962,judge:False
m:16565,score:31628,judge:True
m:16553,score:95194,judge:False
m:16559,score:61895,judge:False
m:16562,score:43688,judge:True
m:16560,score:54409,judge:False
m:16561,score:44724,judge:True


16561

In [14]:
cap.set(cv2.CAP_PROP_POS_FRAMES, 16561)
ret, f = cap.read()
show_frame(f,0)

In [16]:
cap.set(cv2.CAP_PROP_POS_FRAMES, match_score[68])
ret, f = cap.read()
show_frame(f,0)

In [19]:
plt.plot(match_frame,match_score)
# plt.plot(match_score)
# a.sum()

[<matplotlib.lines.Line2D at 0x1dc028e3b50>]

### moviepy 

In [39]:
video = VideoFileClip(video_path,pix_fmt='gray')

In [40]:
resized = resize(video,proxy_size)

duration = resized.duration
fps = resized.fps
n_frames = int(duration * fps)

In [42]:
resized.get_frame(899).shape

(120, 214, 3)

In [22]:
show_frame(frames[3])

NameError: name 'frames' is not defined

In [67]:
cv2.destroyAllWindows()


In [3]:
def extract_features(frame):
    # 使用颜色直方图作为特征
    hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    cv2.normalize(hist, hist)
    return hist.flatten()

def calculate_similarity(hist1, hist2):
    # 使用巴氏距离计算相似度
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_BHATTACHARYYA)

def segment_video(video_path, threshold=0.5):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Cannot open video.")
        return

    segments = []
    prev_hist = None
    segment_start = 0
    frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        hist = extract_features(frame)
        if prev_hist is not None:
            similarity = calculate_similarity(prev_hist, hist)
            if similarity > threshold:
                segments.append((segment_start, frame_count))
                segment_start = frame_count + 1

        prev_hist = hist
        frame_count += 1

    segments.append((segment_start, frame_count))
    cap.release()
    return segments

In [None]:
cap = cv2.VideoCapture("../")