In [1]:
import pylablib as pll
pll.par["devices/dlls/thorlabs_tlcam"] = "path/to/dlls"
from pylablib.devices import Thorlabs
from threading import Lock

import random
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import keyboard
import h5py
import time

file_storage=r'C:\Users\tl457\OneDrive - University Of Cambridge 1\3_Code\lwel-control\test.h5'
cam_serial='16906'

cam = Thorlabs.ThorlabsTLCamera(serial="16906")

In [2]:
cam.open()
print(cam.get_detector_size())
cam.set_roi(80,1440,4,1080,1,1)
info=cam.get_roi()
print(info)
cam.close()

(1440, 1080)
(80, 1440, 4, 1080, 1, 1)


In [3]:
cam.open()
cam.set_exposure(10E-3)  # set 10ms exposure
cam.setup_acquisition(nframes=100)
cam.start_acquisition()
print(cam.get_acquisition_parameters())
N=0
arr=[]
while N<5:
    time.sleep(1) 
    cam.wait_for_frame()  # wait for the next available frame
    frame = cam.read_oldest_image()  # get the oldest image which hasn't been read yet
    arr.append(frame)
    N=N+1
cam.stop_acquisition()
cam.close()

{'nframes': 100}
0
1
2
3
4
[array([[1, 1, 1, ..., 1, 1, 1],
       [1, 1, 1, ..., 1, 2, 2],
       [1, 1, 1, ..., 1, 1, 1],
       ...,
       [1, 1, 2, ..., 1, 1, 1],
       [1, 2, 1, ..., 2, 1, 1],
       [1, 1, 2, ..., 1, 1, 1]], dtype=uint16), array([[2, 2, 1, ..., 1, 1, 1],
       [3, 1, 2, ..., 1, 2, 1],
       [1, 3, 1, ..., 1, 1, 1],
       ...,
       [2, 1, 1, ..., 1, 1, 1],
       [2, 2, 1, ..., 2, 1, 1],
       [1, 1, 2, ..., 1, 1, 1]], dtype=uint16), array([[1, 2, 1, ..., 1, 1, 1],
       [2, 1, 1, ..., 1, 2, 1],
       [2, 2, 2, ..., 1, 1, 1],
       ...,
       [2, 1, 2, ..., 1, 1, 1],
       [2, 1, 1, ..., 1, 1, 2],
       [2, 1, 2, ..., 1, 1, 1]], dtype=uint16), array([[1, 2, 1, ..., 1, 1, 1],
       [1, 1, 1, ..., 1, 1, 1],
       [1, 2, 1, ..., 2, 2, 1],
       ...,
       [2, 1, 1, ..., 1, 1, 1],
       [1, 2, 1, ..., 1, 1, 1],
       [2, 1, 2, ..., 1, 1, 1]], dtype=uint16), array([[1, 1, 2, ..., 1, 1, 1],
       [1, 1, 1, ..., 1, 1, 2],
       [2, 1, 2, ..., 1, 1, 

In [5]:
# abstract base class to represent camera
class DashCamera:
    def __init__(self, camLock, commLock):
        self._camera = None                # spectrometer
        self._camModel = ''              # model name for graph title
        self._camData = [[], []]     # wavelengths and intensities
        self._controlFunctions = {}       # behaviour upon changing controls
        self._roi_x_limit = (0,1)    # x ROI limits
        self._roi_y_limit = (0,1)     # y ROI limits
        self.roi=(100,1440,4,1080,1,1)
        self.comm_lock = commLock         # for communicating with camera
        self.cam_lock = camLock         # for editing spectrometer values

    # refreshes/populates camera properties
    def assign_cam(self):
        return

    # get data for graph
    def get_image(self):
        return self._camData

    # send each command; return successes and failures
    def send_control_values(self, commands):
        return ({}, {})
    
    # getter methods
    def model(self):
        return self._camModel

    def roi_x_limit(self):
        return self._roi_x_limit

    def roi_y_limit(self):
        return self._roi_y_limit
    
# non-demo version
class PhysicalCamera(DashCamera):
    
    def __init__(self, camLock, commLock):
        super().__init__(camLock, commLock)
        self.cam_lock.acquire()
        self.assign_cam()
        self.cam_lock.release()
        self.int_time=0.5
        self.time_interval_seconds=2
        self.number_images=5
        self._controlFunctions = {
            'integration-time-input':
            "self._cam.set_exposure",
            'roi-input':
            "self._cam.set_roi",
            'frame-rate-input':
            "self._cam.set_frame_period",
            
        }
        self.taking_images=False
        self.acquisition_ready=False
        self.number_images=5

    def assign_cam(self,serial_id='16906'):
        try:
            self.comm_lock.acquire()
            devices = Thorlabs.TLCamera.list_cameras()
            self._cam = Thorlabs.ThorlabsTLCamera(serial=serial_id)
            self._camModel = self._cam.get_device_info()[0]
            self._cam.set_exposure(self.int_time)
            self._cam.set_roi(100,1440,4,1080,1,1)
            self._cam.set_frame_period(self.time_interval_seconds)
            self._roi_x_limit = (self._cam.get_roi_limits()[0][0],self._cam.get_roi_limits()[0][1])
            self._roi_y_limit = (self._cam.get_roi_limits()[1][0],self._cam.get_roi_limits()[1][1])
        except Exception:
            pass
        finally:
            print('Camera '+str(self._camModel)+' connected with roi limits '+str(self._roi_x_limit)+' and '+str(self._roi_y_limit))
            print(self._cam.get_roi(),self._cam.get_frame_timings())
            self._cam.close()
            self.comm_lock.release()
            
    def get_image(self):
        if self._cam is None:
            try:
                self.cam_lock.acquire()
                self.assign_cam()
            except Exception:
                pass
            finally:
                self.cam_lock.release()
        try:
            self.comm_lock.acquire()
            self._cam.open()
            self._cam.set_exposure(self.int_time)
            image=self._cam.grab(1)
            self._camData = image[0]
        except Exception:
            pass
        finally:
            self._cam.close()
            self.comm_lock.release()

        return self._camData
    
    def setup_acquisition(self):
        if self._cam is None:
            try:
                self.cam_lock.acquire()
                self.assign_cam()
            except Exception:
                pass
            finally:
                self.cam_lock.release()
        try:
            self.comm_lock.acquire()
            self._cam.open()
            self._cam.setup_acquisition(nframes=100)
            print(self._cam.get_acquisition_parameters())
            self.acquisition_ready=True
            
        except Exception:
            pass
        
        finally:
            self._cam.close()
            self.comm_lock.release()

    
    def start_acquisition(self):
        if self.acquisition_ready==False:
            try:
                self.cam_lock.acquire()
                self.setup_acquisition()
            except Exception:
                pass
            finally:
                self.cam_lock.release()
        N=0
        self.comm_lock.acquire()
        self._cam.open()
        self.taking_images=True
        f=h5py.File(file_storage, "w")
        f.attrs.create("time_interval", self.time_interval_seconds)
        f.attrs.create("cam_integration_time", self.int_time)
        timestamp=datetime.now()
        f.attrs.create("creation_time", timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"))
        f.close()
        self._cam.start_acquisition()
        while N < self.number_images and self.taking_images: 
            if N!=0: # starting data collection immediately
                time.sleep(self.time_interval_seconds)                 
            f = h5py.File(file_storage, 'a')
            grp = f.create_group("dataset_%d" % N)
            self._cam.wait_for_frame()  # wait for the next available frame
            frame = self._cam.read_oldest_image()  # get the oldest image which hasn't been read yet
            ds = grp.create_dataset("image_%d" % N, data=frame)
            timestamp=datetime.now()
            grp.attrs['timestamp'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")
            f.close()
            N += 1
            print("Image %d of %d recorded" % (N,self.number_images))
        print("Done!\n")
        self._cam.stop_acquisition()
        self._cam.close()
        self.taking_images=False
        self.comm_lock.release()

    def send_control_values(self, commands):
        failed = {}
        succeeded = {}
        
        for ctrl_id in commands:
            try:
                self.comm_lock.acquire()
                eval(self._controlFunctions[ctrl_id])(commands[ctrl_id])
                succeeded[ctrl_id] = str(commands[ctrl_id])
            except Exception as e:
                failed[ctrl_id] = str(e).strip('b')
            finally:
                self.comm_lock.release()
                
        return(failed, succeeded)
            
    def model(self):
        self.cam_lock.acquire()
        self._cam.open()
        self.assign_cam()
        self._cam.close()
        self.cam_lock.release()
        return self._camModel
    
    def roi_x_limit(self):
        self.cam_lock.acquire()
        self._cam.open()
        self.assign_cam()
        self._cam.close()
        self.cam_lock.release() 
        return self._roi_x_limit

    def roi_y_limit(self):
        self.cam_lock.acquire()
        self._cam.open()
        self.assign_cam()
        self._cam.close()
        self.cam_lock.release() 
        return self._roi_y_limit

In [6]:
#############################
# Camera properties
#############################

# lock for modifying information about spectrometer
cam_lock = Lock()
# lock for communicating with spectrometer
comm_lock = Lock()

# initialize spec
cam = PhysicalCamera(cam_lock, comm_lock)
cam.assign_cam()

Camera CS165CU connected with roi limits (0, 1) and (0, 1)
(80, 1440, 4, 1080, 1, 1) TAcqTimings(exposure=0.009998, frame_period=0.02862)
Camera CS165CU connected with roi limits (0, 1) and (0, 1)
(100, 1440, 4, 1080, 1, 1) TAcqTimings(exposure=0.500008, frame_period=0.500096)


In [7]:
image=cam.get_image()

In [8]:
print(len(image))

1076


In [9]:
cam.setup_acquisition()

{'nframes': 100}


In [10]:
cam.start_acquisition()

Image 1 of 5 recorded
Image 2 of 5 recorded
Image 3 of 5 recorded
Image 4 of 5 recorded
Image 5 of 5 recorded
Done!



In [None]:
print(cam.taking_images)