from harvesters.core import Harvester import cv2 import os import sys import numpy as np WIDTH = 1920 # Image buffer width HEIGHT = 1200 # Image buffer height WARM_UP_FRAMES = 200 PIXEL_FORMAT = "BayerRG8" # Camera pixel format SDK_CTI_PATH = "C:/Program Files/Common Files/Sentech/GenTL/StGenTL.cti" # Sentech .cti # SDK_CTI_PATH = "C:/Program Files/MATRIX VISION/mvIMPACT Acquire/bin/x64/mvGenTLProducer.cti" # MATRIX VISION .cti CAMERA_MODEL = "STC-MCS241U3V" # Camera product model # ------------------------------------------------------------------------------- # Utility # ------------------------------------------------------------------------------- # UI progress bar def progress(count, total, status=''): bar_len = 40 filled_len = int(round(bar_len * count / float(total))) percents = round(100.0 * count / float(total), 1) bar = u'\u2588' * filled_len + '.' * (bar_len - filled_len) sys.stdout.write('|%s| %s%s - %s\r' % (bar, percents, '%', status)) sys.stdout.flush() # ------------------------------------------------------------------------------- # Camera initialization and shutdown # ------------------------------------------------------------------------------- def init_camera(fps): ia = None h = Harvester() h.add_file(SDK_CTI_PATH) h.update() try: ia = h.create_image_acquirer(model=CAMERA_MODEL) except: print("[ERROR] Camera with specified model \"" + CAMERA_MODEL + "\" is busy or not connected.") exit(1) ia.remote_device.node_map.Width.value = WIDTH ia.remote_device.node_map.Height.value = HEIGHT ia.remote_device.node_map.PixelFormat.value = PIXEL_FORMAT ia.remote_device.node_map.AcquisitionFrameRate.value = fps ia.timeout_for_image_acquisition = 1000 ia.start_acquisition() return h, ia def shutdown_camera(image_acquirer, harvester): image_acquirer.stop_acquisition() image_acquirer.destroy() harvester.reset() # ------------------------------------------------------------------------------- # Main # ------------------------------------------------------------------------------- def run(): # Ask user for parameters file_name = input("\n[INPUT] Enter video name (without extension): ") n_frames = int(input("\n[INPUT] Enter how many frames to record: ")) fps = int(input("\n[INPUT] Enter how many FPS: ")) # frames = np.zeros([n_frames, 1200, 1920, 3], dtype=np.uint8) frames = np.zeros([n_frames, 1200, 1920], dtype=np.uint8) # Init camera print("\n[INFO] Connecting to camera, please wait...\n") h, ia = init_camera(fps) print("[INFO] Connected successfully!\n") # Camera warm up loop print("[INFO] Warming up camera, please wait...\n") for i in range(WARM_UP_FRAMES): print("Before fetching") buffer = ia.fetch_buffer() print("After fetching") buffer.queue() progress(i, n_frames, status="Warming up...") print("[INFO] Camera is ready!\n") # Variables for timestamp's deltas handling (not interesting) dts = [-1.0] * (n_frames - 1) logs = [-1.0] * n_frames log_string = "" exp_dt = 1 / fps prev_ts = -1.0 ns_to_s = 1 / 1000000000 # Actual acquisition loop for i in range(n_frames): with ia.fetch_buffer() as buffer: # np.copyto(frames[i], cv2.cvtColor(buffer.payload.components[0].data.reshape(buffer.payload.components[0].height, buffer.payload.components[0].width), cv2.COLOR_BAYER_RG2RGB)) np.copyto(frames[i], buffer.payload.components[0].data.reshape(buffer.payload.components[0].height, buffer.payload.components[0].width)) timestamp = buffer.timestamp_ns if i == 0: prev_ts = timestamp else: dts[i - 1] = timestamp - prev_ts prev_ts = timestamp progress(i, n_frames, status="Capturing...") # Timestamp post-processing and video writing (not interesting) k = 0 # Logging sum = 0 for i in range(len(dts)): current_dt = dts[i] * ns_to_s delta = abs(current_dt - exp_dt) sum += current_dt if delta >= 0.00001: logs[k] = current_dt k += 1 cap_fps = (n_frames - 1) / sum print("\n\n[INFO] Video captured successfully! (FPS: " + str('{:.2f}'.format(cap_fps)) + ")\n") if k > 0: log_string += "[INFO] The following " + str(k) + " intervals out of " + str( n_frames) + " do not match the goal (" + str(fps) + " FPS -> " + str( '{:.6f}'.format(exp_dt)) + " s):\n" for i in range(k): log_string += str('{:.6f}'.format(logs[i])) + " (+/- " + str( '{:.6f}'.format(logs[i] - exp_dt)) + " s)\n" else: log_string += "[INFO] All intervals match the goal! (" + str(fps) + " FPS -> " + str( '{:.6f}'.format(exp_dt)) + " s):\n" with open(file_name + ".txt", "w") as text_file: text_file.write("Goal: " + str(exp_dt) + " s\n\n") for el in dts: text_file.write(str(el) + "\n") print(log_string) # Output video writing (not interesting) rec_fps = round(cap_fps) out = cv2.VideoWriter(file_name + ".avi", cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'), rec_fps, (WIDTH, HEIGHT)) for i in range(len(frames)): out.write(frames[i]) progress(i, n_frames, status="Writing...") path = os.getcwd() + "\\" + file_name + ".avi" print("\n\n[INFO] Video written successfully!\n") print("[INFO] Path: " + path) # Workaround to avoid printing some strange errors when camera is shutdown (not interesting) original_stderr = sys.stderr sys.stderr = open(os.devnull, "w") shutdown_camera(ia, h) # Workaround to avoid printing some strange errors when camera is shutdown (not interesting) sys.stderr = original_stderr exit(0) if __name__ == "__main__": run()