In [2]:
import PySpin
import numpy as np
from PIL import Image

NUM_IMAGES = 12000  # Number of images to capture
TRIGGER_SOURCE = 'Line3'  # Trigger source to Line3 (green wire)

class TriggerType:
    HARDWARE = 2

CHOSEN_TRIGGER = TriggerType.HARDWARE

def configure_trigger(cam):
    """
    Configures the camera to use a hardware trigger on Line3.
    """
    try:
        # Ensure trigger mode off
        nodemap = cam.GetNodeMap()
        node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerMode'))
        node_trigger_mode_off = node_trigger_mode.GetEntryByName('Off')
        node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue())

        # Set TriggerSelector to FrameStart
        node_trigger_selector = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerSelector'))
        node_trigger_selector_framestart = node_trigger_selector.GetEntryByName('FrameStart')
        node_trigger_selector.SetIntValue(node_trigger_selector_framestart.GetValue())

        # Select trigger source
        node_trigger_source = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerSource'))
        node_trigger_source_hardware = node_trigger_source.GetEntryByName(TRIGGER_SOURCE)
        node_trigger_source.SetIntValue(node_trigger_source_hardware.GetValue())

        # Set TriggerActivation to RisingEdge
        node_trigger_activation = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerActivation'))
        node_trigger_activation_risingedge = node_trigger_activation.GetEntryByName('RisingEdge')
        node_trigger_activation.SetIntValue(node_trigger_activation_risingedge.GetValue())

        # Turn trigger mode on
        node_trigger_mode_on = node_trigger_mode.GetEntryByName('On')
        node_trigger_mode.SetIntValue(node_trigger_mode_on.GetValue())

        print('Trigger configured to hardware (Line3) with rising edge activation.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def grab_next_image_by_trigger(cam):
    """
    Waits for hardware trigger to capture image.
    """
    try:
        # Wait for hardware trigger to capture image
        print('Waiting for hardware trigger...')
        image_result = cam.GetNextImage()

        if image_result.IsIncomplete():
            print('Image incomplete with image status %d ...' % image_result.GetImageStatus())
        else:
            print('Image grabbed successfully.')
            return image_result

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return None

    return None

def acquire_images(cam, nodemap, nodemap_tldevice):
    """
    Acquires images from the camera and stores them as numpy arrays.
    """
    try:
        images = []  # List to store numpy arrays of images

        # Set acquisition mode to continuous
        node_acquisition_mode = PySpin.CEnumerationPtr(nodemap.GetNode('AcquisitionMode'))
        node_acquisition_mode_continuous = node_acquisition_mode.GetEntryByName('Continuous')
        node_acquisition_mode.SetIntValue(node_acquisition_mode_continuous.GetValue())

        # Begin acquiring images
        cam.BeginAcquisition()
        print('Acquiring images...')

        # Retrieve device serial number for filename
        device_serial_number = ''
        node_device_serial_number = PySpin.CStringPtr(nodemap_tldevice.GetNode('DeviceSerialNumber'))
        if PySpin.IsReadable(node_device_serial_number):
            device_serial_number = node_device_serial_number.GetValue()

        # Retrieve and store images as numpy arrays
        for i in range(NUM_IMAGES):
            image_result = grab_next_image_by_trigger(cam)
            if image_result:
                # Convert image data to numpy array
                image_converted = image_result.GetNDArray()

                # Append numpy array to list
                images.append(image_converted)

                print(f'Image {i} grabbed successfully.')

                # Release image
                image_result.Release()
            else:
                print('Failed to grab image.')

        # End acquisition
        cam.EndAcquisition()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return None

    return images

def reset_trigger(nodemap):
    """
    Returns the camera to a normal state by turning off trigger mode.
    """
    try:
        node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerMode'))
        node_trigger_mode_off = node_trigger_mode.GetEntryByName('Off')
        node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue())
        print('Trigger mode disabled.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def print_device_info(nodemap):
    """
    Prints the device information of the camera.
    """
    print('* DEVICE INFORMATION *\n')
    try:
        node_device_information = PySpin.CCategoryPtr(nodemap.GetNode('DeviceInformation'))
        if PySpin.IsReadable(node_device_information):
            features = node_device_information.GetFeatures()
            for feature in features:
                node_feature = PySpin.CValuePtr(feature)
                print(f'{node_feature.GetName()}: {node_feature.ToString() if PySpin.IsReadable(node_feature) else "Node not readable"}')
        else:
            print('Device control information not readable.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def run_single_camera(cam):
    """
    Runs the example on a single camera.
    """
    try:
        # Retrieve TL device nodemap and print device information
        nodemap_tldevice = cam.GetTLDeviceNodeMap()
        print_device_info(nodemap_tldevice)

        # Initialize camera
        cam.Init()

        # Retrieve GenICam nodemap
        nodemap = cam.GetNodeMap()

        # Configure trigger
        if not configure_trigger(cam):
            return False

        # Acquire images
        images = acquire_images(cam, nodemap, nodemap_tldevice)

        # Reset trigger
        reset_trigger(nodemap)

        # Deinitialize camera
        cam.DeInit()

        return images

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return None

def main():
    """
    Main entry point for the example.
    """
    try:
        # Retrieve singleton reference to system object
        system = PySpin.System.GetInstance()

        # Retrieve list of cameras from the system
        cam_list = system.GetCameras()
        num_cameras = cam_list.GetSize()

        print(f'Number of cameras detected: {num_cameras}')

        # Finish if there are no cameras
        if num_cameras == 0:
            cam_list.Clear()
            system.ReleaseInstance()
            print('Not enough cameras!')
            return None

        # Run example on each camera
        all_images = []
        for i, cam in enumerate(cam_list):
            print(f'Running example for camera {i}...')
            images = run_single_camera(cam)
            if images:
                all_images.extend(images)
            print(f'Camera {i} example complete...')

        # Clear camera list before releasing system
        cam_list.Clear()

        # Release system instance
        system.ReleaseInstance()

        # Save images as JPEG files
        save_images(all_images)

        return all_images

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return None

def save_images(images):
    """
    Save numpy arrays as JPEG images.
    """
    try:
        for i, img in enumerate(images):
            # Convert numpy array to Pillow Image
            img_pil = Image.fromarray(img)

            # Save image
            filename = f'image_{i}.jpg'
            img_pil.save(filename)

            print(f'Image saved as {filename}')

    except Exception as ex:
        print('Error saving images:', ex)

if __name__ == '__main__':
    main()


Number of cameras detected: 1
Running example for camera 0...
* DEVICE INFORMATION *

DeviceID: USB\VID_1E10&PID_4000&MI_00\6&2A5D0DF9&0&0000_0
DeviceSerialNumber: 16290112
DeviceUserID: 
DeviceVendorName: FLIR
DeviceModelName: Blackfly S BFS-U3-13Y3M
DeviceVersion: 1808.0.120.0
DeviceBootloaderVersion: Node not readable
DeviceType: USB3Vision
DeviceDisplayName: FLIR Blackfly S BFS-U3-13Y3M
DeviceAccessStatus: OpenReadWrite
DeviceDriverVersion: PGRUSBCam3.sys : 2.7.3.249
DeviceIsUpdater: 0
DeviceInstanceId: USB\VID_1E10&PID_4000&MI_00\6&2A5D0DF9&0&0000
DeviceLocation: 0000.0014.0000.021.000.000.000.000.000
DeviceCurrentSpeed: SuperSpeed
DeviceU3VProtocol: 1
DevicePortId: 21
GenICamXMLLocation: Device
GenICamXMLPath: 
GUIXMLLocation: Device
GUIXMLPath: Input.xml
Trigger configured to hardware (Line3) with rising edge activation.
Acquiring images...
Waiting for hardware trigger...
Image grabbed successfully.
Image 0 grabbed successfully.
Waiting for hardware trigger...
Image grabbed succ

SAVE IMAGES IN RAM AS NUMP ARRAYS AND THEN IN THEN DUMP IN THE CPU

In [None]:
import PySpin
import numpy as np
from PIL import Image

NUM_IMAGES = 10  # Number of images to capture
TRIGGER_SOURCE = 'Line3'  # Trigger source to Line3 (green wire)

class TriggerType:
    HARDWARE = 2

CHOSEN_TRIGGER = TriggerType.HARDWARE

def configure_trigger(cam):
    """
    Configures the camera to use a hardware trigger on Line3.
    """
    try:
        nodemap = cam.GetNodeMap()
        node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerMode'))
        node_trigger_mode_off = node_trigger_mode.GetEntryByName('Off')
        node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue())

        node_trigger_selector = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerSelector'))
        node_trigger_selector_framestart = node_trigger_selector.GetEntryByName('FrameStart')
        node_trigger_selector.SetIntValue(node_trigger_selector_framestart.GetValue())

        node_trigger_source = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerSource'))
        node_trigger_source_hardware = node_trigger_source.GetEntryByName(TRIGGER_SOURCE)
        node_trigger_source.SetIntValue(node_trigger_source_hardware.GetValue())

        node_trigger_activation = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerActivation'))
        node_trigger_activation_risingedge = node_trigger_activation.GetEntryByName('RisingEdge')
        node_trigger_activation.SetIntValue(node_trigger_activation_risingedge.GetValue())

        node_trigger_mode_on = node_trigger_mode.GetEntryByName('On')
        node_trigger_mode.SetIntValue(node_trigger_mode_on.GetValue())

        print('Trigger configured to hardware (Line3) with rising edge activation.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def grab_next_image_by_trigger(cam):
    """
    Waits for hardware trigger to capture image.
    """
    try:
        print('Waiting for hardware trigger...')
        image_result = cam.GetNextImage()

        if image_result.IsIncomplete():
            print('Image incomplete with image status %d ...' % image_result.GetImageStatus())
        else:
            print('Image grabbed successfully.')
            return image_result

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return None

    return None

def save_image(image_result, filename):
    """
    Saves the captured image to disk as a JPEG file.
    """
    try:
        # Convert image data to numpy array
        image_np = image_result.GetNDArray()
        # Convert numpy array to Pillow Image
        img_pil = Image.fromarray(image_np)
        # Save image
        img_pil.save(filename)
        print(f'Image saved as {filename}')
    except Exception as ex:
        print(f'Error saving image {filename}:', ex)

def acquire_images(cam, nodemap, nodemap_tldevice):
    """
    Acquires images from the camera and processes them immediately.
    """
    try:
        node_acquisition_mode = PySpin.CEnumerationPtr(nodemap.GetNode('AcquisitionMode'))
        node_acquisition_mode_continuous = node_acquisition_mode.GetEntryByName('Continuous')
        node_acquisition_mode.SetIntValue(node_acquisition_mode_continuous.GetValue())

        cam.BeginAcquisition()
        print('Acquiring images...')

        device_serial_number = ''
        node_device_serial_number = PySpin.CStringPtr(nodemap_tldevice.GetNode('DeviceSerialNumber'))
        if PySpin.IsReadable(node_device_serial_number):
            device_serial_number = node_device_serial_number.GetValue()

        for i in range(NUM_IMAGES):
            image_result = grab_next_image_by_trigger(cam)
            if image_result:
                filename = f'Trigger-{device_serial_number}-{i}.jpg'
                save_image(image_result, filename)
                image_result.Release()
            else:
                print('Failed to grab image.')

        cam.EndAcquisition()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def reset_trigger(nodemap):
    """
    Returns the camera to a normal state by turning off trigger mode.
    """
    try:
        node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerMode'))
        node_trigger_mode_off = node_trigger_mode.GetEntryByName('Off')
        node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue())
        print('Trigger mode disabled.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def print_device_info(nodemap):
    """
    Prints the device information of the camera.
    """
    print('* DEVICE INFORMATION *\n')
    try:
        node_device_information = PySpin.CCategoryPtr(nodemap.GetNode('DeviceInformation'))
        if PySpin.IsReadable(node_device_information):
            features = node_device_information.GetFeatures()
            for feature in features:
                node_feature = PySpin.CValuePtr(feature)
                print(f'{node_feature.GetName()}: {node_feature.ToString() if PySpin.IsReadable(node_feature) else "Node not readable"}')
        else:
            print('Device control information not readable.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def run_single_camera(cam):
    """
    Runs the example on a single camera.
    """
    try:
        nodemap_tldevice = cam.GetTLDeviceNodeMap()
        print_device_info(nodemap_tldevice)

        cam.Init()

        nodemap = cam.GetNodeMap()

        if not configure_trigger(cam):
            return False

        acquire_images(cam, nodemap, nodemap_tldevice)

        reset_trigger(nodemap)

        cam.DeInit()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def main():
    """
    Main entry point for the example.
    """
    try:
        system = PySpin.System.GetInstance()

        cam_list = system.GetCameras()
        num_cameras = cam_list.GetSize()

        print(f'Number of cameras detected: {num_cameras}')

        if num_cameras == 0:
            cam_list.Clear()
            system.ReleaseInstance()
            print('Not enough cameras!')
            return False

        for i, cam in enumerate(cam_list):
            print(f'Running example for camera {i}...')
            run_single_camera(cam)
            print(f'Camera {i} example complete...')

        cam_list.Clear()

        system.ReleaseInstance()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

if __name__ == '__main__':
    main()


with the above code, the RAM usage was hitting 96% and then the CPU was going to 50% as the ram was getting emptied

USE RAM AS BUFFER

In [2]:
import PySpin
import numpy as np
from PIL import Image
import threading
import queue
import time

NUM_IMAGES = 12000  # Number of images to capture
TRIGGER_SOURCE = 'Line3'  # Trigger source to Line3 (green wire)
BUFFER_SIZE = 5000  # Size of the buffer

class TriggerType:
    HARDWARE = 2

CHOSEN_TRIGGER = TriggerType.HARDWARE

def configure_trigger(cam):
    """
    Configures the camera to use a hardware trigger on Line3.
    """
    try:
        nodemap = cam.GetNodeMap()
        node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerMode'))
        node_trigger_mode_off = node_trigger_mode.GetEntryByName('Off')
        node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue())

        node_trigger_selector = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerSelector'))
        node_trigger_selector_framestart = node_trigger_selector.GetEntryByName('FrameStart')
        node_trigger_selector.SetIntValue(node_trigger_selector_framestart.GetValue())

        node_trigger_source = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerSource'))
        node_trigger_source_hardware = node_trigger_source.GetEntryByName(TRIGGER_SOURCE)
        node_trigger_source.SetIntValue(node_trigger_source_hardware.GetValue())

        node_trigger_activation = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerActivation'))
        node_trigger_activation_risingedge = node_trigger_activation.GetEntryByName('RisingEdge')
        node_trigger_activation.SetIntValue(node_trigger_activation_risingedge.GetValue())

        node_trigger_mode_on = node_trigger_mode.GetEntryByName('On')
        node_trigger_mode.SetIntValue(node_trigger_mode_on.GetValue())

        print('Trigger configured to hardware (Line3) with rising edge activation.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def grab_next_image_by_trigger(cam):
    """
    Waits for hardware trigger to capture image.
    """
    try:
        print('Waiting for hardware trigger...')
        image_result = cam.GetNextImage()

        if image_result.IsIncomplete():
            print('Image incomplete with image status %d ...' % image_result.GetImageStatus())
        else:
            print('Image grabbed successfully.')
            return image_result

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return None

    return None

def save_image(image_np, filename):
    """
    Saves the captured image to disk as a JPEG file.
    """
    try:
        img_pil = Image.fromarray(image_np)
        img_pil.save(filename)
        print(f'Image saved as {filename}')
    except Exception as ex:
        print(f'Error saving image {filename}:', ex)

def acquire_images(cam, nodemap, nodemap_tldevice, image_queue):
    """
    Acquires images from the camera and puts them in a queue for processing.
    """
    try:
        node_acquisition_mode = PySpin.CEnumerationPtr(nodemap.GetNode('AcquisitionMode'))
        node_acquisition_mode_continuous = node_acquisition_mode.GetEntryByName('Continuous')
        node_acquisition_mode.SetIntValue(node_acquisition_mode_continuous.GetValue())

        cam.BeginAcquisition()
        print('Acquiring images...')

        device_serial_number = ''
        node_device_serial_number = PySpin.CStringPtr(nodemap_tldevice.GetNode('DeviceSerialNumber'))
        if PySpin.IsReadable(node_device_serial_number):
            device_serial_number = node_device_serial_number.GetValue()

        for i in range(NUM_IMAGES):
            image_result = grab_next_image_by_trigger(cam)
            if image_result:
                image_np = image_result.GetNDArray()
                filename = f'Trigger-{device_serial_number}-{i}.jpg'
                image_queue.put((image_np, filename))
                image_result.Release()
            else:
                print('Failed to grab image.')

        cam.EndAcquisition()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def process_images(image_queue):
    """
    Processes images from the queue and saves them to disk.
    """
    while True:
        image_np, filename = image_queue.get()
        if image_np is None:
            break
        save_image(image_np, filename)
        image_queue.task_done()

def reset_trigger(nodemap):
    """
    Returns the camera to a normal state by turning off trigger mode.
    """
    try:
        node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode('TriggerMode'))
        node_trigger_mode_off = node_trigger_mode.GetEntryByName('Off')
        node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue())
        print('Trigger mode disabled.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def print_device_info(nodemap):
    """
    Prints the device information of the camera.
    """
    print('* DEVICE INFORMATION *\n')
    try:
        node_device_information = PySpin.CCategoryPtr(nodemap.GetNode('DeviceInformation'))
        if PySpin.IsReadable(node_device_information):
            features = node_device_information.GetFeatures()
            for feature in features:
                node_feature = PySpin.CValuePtr(feature)
                print(f'{node_feature.GetName()}: {node_feature.ToString() if PySpin.IsReadable(node_feature) else "Node not readable"}')
        else:
            print('Device control information not readable.')

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def run_single_camera(cam, image_queue):
    """
    Runs the example on a single camera.
    """
    try:
        nodemap_tldevice = cam.GetTLDeviceNodeMap()
        print_device_info(nodemap_tldevice)

        cam.Init()

        nodemap = cam.GetNodeMap()

        if not configure_trigger(cam):
            return False

        acquire_images(cam, nodemap, nodemap_tldevice, image_queue)

        reset_trigger(nodemap)

        cam.DeInit()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

def main():
    """
    Main entry point for the example.
    """
    try:
        system = PySpin.System.GetInstance()

        cam_list = system.GetCameras()
        num_cameras = cam_list.GetSize()

        print(f'Number of cameras detected: {num_cameras}')

        if num_cameras == 0:
            cam_list.Clear()
            system.ReleaseInstance()
            print('Not enough cameras!')
            return False

        image_queue = queue.Queue(maxsize=BUFFER_SIZE)
        threads = []

        # Start the image processing thread
        t = threading.Thread(target=process_images, args=(image_queue,))
        t.start()
        threads.append(t)

        for i, cam in enumerate(cam_list):
            print(f'Running example for camera {i}...')
            run_single_camera(cam, image_queue)
            print(f'Camera {i} example complete...')

        # Stop the processing thread
        image_queue.put((None, None))
        for t in threads:
            t.join()

        cam_list.Clear()
        system.ReleaseInstance()

    except PySpin.SpinnakerException as ex:
        print('Error: %s' % ex)
        return False

    return True

if __name__ == '__main__':
    main()


Number of cameras detected: 1
Running example for camera 0...
* DEVICE INFORMATION *

DeviceID: USB\VID_1E10&PID_4000&MI_00\6&2A5D0DF9&0&0000_0
DeviceSerialNumber: 16290112
DeviceUserID: 
DeviceVendorName: FLIR
DeviceModelName: Blackfly S BFS-U3-13Y3M
DeviceVersion: 1808.0.120.0
DeviceBootloaderVersion: Node not readable
DeviceType: USB3Vision
DeviceDisplayName: FLIR Blackfly S BFS-U3-13Y3M
DeviceAccessStatus: OpenReadWrite
DeviceDriverVersion: PGRUSBCam3.sys : 2.7.3.249
DeviceIsUpdater: 0
DeviceInstanceId: USB\VID_1E10&PID_4000&MI_00\6&2A5D0DF9&0&0000
DeviceLocation: 0000.0014.0000.021.000.000.000.000.000
DeviceCurrentSpeed: SuperSpeed
DeviceU3VProtocol: 1
DevicePortId: 21
GenICamXMLLocation: Device
GenICamXMLPath: 
GUIXMLLocation: Device
GUIXMLPath: Input.xml
Trigger configured to hardware (Line3) with rising edge activation.
Acquiring images...
Waiting for hardware trigger...
Image grabbed successfully.
Waiting for hardware trigger...
Image saved as Trigger-16290112-0.jpg
Image grab