In [2]:
import cv2
import numpy as np
import os 
from tqdm.notebook import tqdm 
import zipfile

In [3]:
# 最终可执行的python程序，用于生成光流
# load the numpy data 
def load_np_data(dir_path):
    files = os.listdir(dir_path)
    for file in files:
        data = np.load(os.path.join(dir_path,file))
        yield data,file

In [4]:
# test data
test_data = np.random.randint(0,256,size=(1,2,3))
test_data[...,1].shape

(1, 2)

In [8]:
# the data is 3D
def np2flow(data,output_dir,zip_dir):
    # caculate the frame num 
    total_frames , height , width = data.shape
    # create the window
    cv2.namedWindow('optical flow',cv2.WINDOW_GUI_NORMAL)

    if data is None:
        print('data is none')
    prvs = data[0] 

    # create hsv for opencv operation 
    hsv = np.zeros((height, width, 3), dtype=np.uint8)
    hsv[..., 1] = 255

    
    # progress_bat = tqdm(total=len(data)-1,desc = 'loading')
    for index , frame in enumerate(data[1:]):
        flow = cv2.calcOpticalFlowFarneback(prvs,frame,None, 0.5, 3, 15, 3, 5, 1.2, 0)
        flow_x , flow_y = flow[...,0] ,flow[...,1] 
        flow_x_file = os.path.join(output_dir, f'frame_{index:04d}_x.jpg')
        flow_y_file = os.path.join(output_dir, f'frame_{index:04d}_y.jpg')
        cv2.imwrite(flow_x_file, flow_x)
        cv2.imwrite(flow_y_file, flow_y)

        mag,ang = cv2.cartToPolar(flow[...,0] , flow[...,1])
        
        hsv[...,0] = ang * 180 / np.pi /2  
        hsv[...,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX)

        bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2RGB)
        cv2.imshow('optical flow',bgr) 

        # 检查是否按下了 'q' 键，如果按下则退出循环
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        prvs = frame
        # progress_bat.update(1)
    # add an empty img 
    cv2.imwrite(os.path.join(output_dir,f'frame_{len(data)-1:04d}_x.jpg'),np.zeros_like(prvs))
    cv2.imwrite(os.path.join(output_dir,f'frame_{len(data)-1:04d}_y.jpg'),np.zeros_like(prvs))
    
    # progress_bat.close()
    
    # progress_bat_zip_x = tqdm(total=len(data)-1,desc = 'zip flow_x')
    # progress_bat_zip_y = tqdm(total = len(data)-1,desc = 'zip flow_y')
    with zipfile.ZipFile(os.path.join(zip_dir,'flow_x.zip'), 'w', zipfile.ZIP_DEFLATED) as zf:
        for i in range(0,len(data)):
            file_name = os.path.join(output_dir, f'frame_{i:04d}_x.jpg')
            zf.write(file_name, os.path.basename(file_name))
            # progress_bat_zip_x.update(1)
    # progress_bat_zip_x.close()

    with zipfile.ZipFile(os.path.join(zip_dir,'flow_y.zip'), 'w', zipfile.ZIP_DEFLATED) as zf:
        for i in range(0, len(data)):
            file_name = os.path.join(output_dir, f'frame_{i:04d}_y.jpg')
            zf.write(file_name, os.path.basename(file_name))
            # progress_bat_zip_y.update(1)
    
    # 清理生成的光流图像
    for i in range(0, len(data)):
        file_x = os.path.join(output_dir, f'frame_{i:04d}_x.jpg')
        file_y = os.path.join(output_dir, f'frame_{i:04d}_y.jpg')
        os.remove(file_x)
        os.remove(file_y)
    # progress_bat_zip_y.close()

In [9]:
root_dir = r"videos"
data_path = r"E:\竞赛科研\PE大创\data\pe_data_100"
data_iter = load_np_data(data_path)
files_num = len(os.listdir(data_path))
progress = tqdm(total=files_num,desc='calculating')
for index , (data,filename) in enumerate(data_iter):
    zip_dir = os.path.join(root_dir,os.path.splitext(filename)[0])
    img_output = 'img_output'
    os.makedirs(zip_dir,exist_ok=True)
    os.makedirs(img_output,exist_ok=True)
    np2flow(data,zip_dir=zip_dir,output_dir=img_output)
    progress.update(1)
progress.close()
cv2.destroyAllWindows()


calculating:   0%|          | 0/89 [00:00<?, ?it/s]

: 