In [4]:
import ctypes
import time
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
import qcodes_contrib_drivers
from qcodes_contrib_drivers.drivers.Thorlabs.K10CR1 import Thorlabs_K10CR1

# Devices classes programming

## Old kinesis controller code

In [None]:
class KinesysController:
    def __init__(self, serial_num, dll_lib):
        self.serial_num = serial_num
        self.lib = dll_lib

    def open_device(self):
        if self.lib.TLI_BuildDeviceList() == 0:
            self.lib.CC_Open(self.serial_num)
            self.lib.CC_StartPolling(self.serial_num, ctypes.c_int(200))
            self.lib.CC_Home(self.serial_num)
            time.sleep(1)
            # Параметры устройства
            self.lib.CC_SetMotorParamsExt(self.serial_num,
                                         ctypes.c_double(1919.64186),
                                         ctypes.c_double(1.0),
                                         ctypes.c_double(1.0))
            self.lib.CC_RequestPosition(self.serial_num)
            time.sleep(0.2)
            dev_pos = ctypes.c_int(self.lib.CC_GetPosition(self.serial_num))
            real_pos = ctypes.c_double()
            self.lib.CC_GetRealValueFromDeviceUnit(self.serial_num, dev_pos, ctypes.byref(real_pos), 0)
            print(f'Position after homing: {real_pos.value}')

    def rotate_to(self, angle_deg):
        pos_real = ctypes.c_double(angle_deg)
        pos_dev = ctypes.c_int()
        self.lib.CC_GetDeviceUnitFromRealValue(self.serial_num, pos_real, ctypes.byref(pos_dev), 0)
        print(f'Move to angle: {pos_real.value} degrees (device units: {pos_dev.value})')
        self.lib.CC_SetMoveAbsolutePosition(self.serial_num, pos_dev)
        time.sleep(0.25)
        self.lib.CC_MoveAbsolute(self.serial_num)
        self.lib.CC_RequestPosition(self.serial_num)
        time.sleep(0.2)
        return pos_real.value

    def rotate_relative(self, delta_angle_deg):
        pos_dev_curr = ctypes.c_int(self.lib.CC_GetPosition(self.serial_num))
        pos_real_curr = ctypes.c_double()
        self.lib.CC_GetRealValueFromDeviceUnit(self.serial_num, pos_dev_curr, ctypes.byref(pos_real_curr), 0)
        new_real = ctypes.c_double(pos_real_curr.value + delta_angle_deg)
        pos_dev_new = ctypes.c_int()
        self.lib.CC_GetDeviceUnitFromRealValue(self.serial_num, new_real, ctypes.byref(pos_dev_new), 0)
        print(f'Move relative by: {delta_angle_deg} degrees to {new_real.value} (device units: {pos_dev_new.value})')
        self.lib.CC_SetMoveAbsolutePosition(self.serial_num, pos_dev_new)
        time.sleep(0.25)
        self.lib.CC_MoveAbsolute(self.serial_num)
        self.lib.CC_RequestPosition(self.serial_num)
        time.sleep(0.2)
        return new_real.value


## Jog old rotator control (kdc101)

In [None]:
from pylablib.devices import Thorlabs
Thorlabs.list_kinesis_devices()
rotator = Thorlabs.KinesisMotor("27254803")
rotator.get_scale()
rotator.get_velocity_parameters()


rotator.move_to(0)

## New rotator (KD10CR)

In [None]:
class K10CR1_rotator:
    def __init__(self):
        self.apt = qcodes_contrib_drivers.drivers.Thorlabs.private.APT.Thorlabs_APT()
        self.inst = Thorlabs_K10CR1("K10CR1", 0, self.apt)
        # Move to zero and recalibrate
        self.inst.move_home()

        # Read position
        print("Position:", self.inst.position())
        self.current_pos = self.inst.position()
        self.current_max_angular_velocity = self.inst._get_velocity_max()
    def position_info(self, print: bool = True):
        self.current_pos = self.inst._get_position()
        if print:
            print("Position:", self.current_pos)

        return self.current_pos
    def rotation_to_angle(self, deg:float):
        try:
            if self.current_pos != deg:
                self.inst.position(deg)
                self.position_info()
        except:
            print('Rotator is currently at this position')
        


    def jog(self, max_velocity: float = 10, direction: str = 'fwd'):
        if max_velocity <= 30:
            self.inst.velocity_max(max_velocity)
        self.inst.move_direction(direction)
        print('jog finished')
        return self.position_info(print = False)
    
    def stop(self, print: bool = True):
        self.inst.stop()
        self.position_info(print = True)
        return self.current_pos
    
    def movement_with_info(self, deg: float, max_velocity: float = 10):
        import time

      
        self.inst.position_async(deg)

  
        while self.current_pos < deg:
            self.position_info(print = True)
            time.sleep(0.25)
        
      
        # self.inst.stop()

        # print("Position:", self.inst.position())



In [None]:
import time

def rotate_until_max_power(meter, rotator, shutter, direction='fwd', poll_interval=0.1, max_wait=60):
    """
    Запускает вращение rotator в указанном направлении ('fwd' или 'rvs'),
    отслеживает показания мощности через meter.get_power(),
    при достижении максимума останавливает вращатель и открывает shutter.

    Параметры:
    - meter: объект мощиметра с методом get_power()
    - rotator: объект вращателя с методами jog(direction) и stop()
    - shutter: объект затвора с методом open()
    - direction: направление вращения ('fwd' или 'rvs')
    - poll_interval: время между измерениями мощности (сек)
    - max_wait: максимальное время ожидания в секундах (чтобы не зациклиться)
    """

    rotator.jog(direction)  # Начинаем вращение
    print(f"Rotator started moving {direction}")
    tol = 1e-2
    max_power = -float('inf')
    max_power_time = time.time()
    stable_count = 0  # счётчик стабильных максимальных значений
    max_power_value = None

    start_time = time.time()

    while True:
        if time.time() - start_time > max_wait:
            print("Timeout reached, stopping rotation.")
            break

        power = meter.get_power()
        if power is None:
            print("Warning: meter returned None power value. Retrying...")
            time.sleep(poll_interval)
            continue

        # Обновляем максимум если нашли большее значение мощности
        if (np.abs(power - max_power) < tol and (stable_count >= 2)) or power > max_power:
            max_power = power
            max_power_time = time.time()
            stable_count = 0
            max_power_value = power
        else:

            stable_count += 1
            if stable_count >= 3:  
                print(f"Maximum power detected: {max_power_value:.4f} W")
                break

        time.sleep(poll_interval)

    rotator.stop()
    print("Rotator stopped.")
    
    shutter.open()
    print("Shutter opened.")

    return max_power_value


## Shutter code

In [2]:
class ShutterController:
    def __init__(self, serial_num, device_manager, device_creator):
        self.serial_num = serial_num
        self.device_manager = device_manager
        self.device_creator = device_creator
        self.device = None

    def initialize(self):
        self.device_manager.BuildDeviceList()
        self.device = self.device_creator.CreateKCubeSolenoid(self.serial_num)
        self.device.Connect(self.serial_num)
        if not self.device.IsSettingsInitialized():
            self.device.WaitForSettingsInitialized(10000)
        assert self.device.IsSettingsInitialized() is True
        self.device.StartPolling(250)
        time.sleep(0.25)
        self.device.EnableDevice()
        time.sleep(0.5)
        print(self.device.GetDeviceInfo().Description)
        self.device.SetOperatingMode(self.device.SolenoidStatus.OperatingModes.Manual)
        print("Shutter is ready")

    def open(self):
        if self.device:
            self.device.SetOperatingState(self.device.SolenoidStatus.OperatingStates.Active)
            print("Shutter opened")

    def close(self):
        if self.device:
            self.device.SetOperatingState(self.device.SolenoidStatus.OperatingStates.Inactive)
            print("Shutter closed")




In [None]:
class SineAngleCalculator:
    @staticmethod
    def sine_function(x, A, B, C, D):
        return A * np.sin(B * x + C) + D

    @staticmethod
    def fit_sine_curve(angles_deg, values):
        angles_rad = np.radians(angles_deg)
        initial_guess = [max(values), 4.0, 0.0, 0.0]
        params, _ = curve_fit(SineAngleCalculator.sine_function, angles_rad, values, p0=initial_guess)
        return params

    @staticmethod
    def calculate_angle_from_power(A, B, C, D, power_val):
        return (np.arcsin((power_val - D) / A) - C) / B * 180 / np.pi

    @staticmethod
    def find_angle_for_sinusoid_value(A, freq, phase, y_shift, y_target, degrees=True):
        normalized = (y_target - y_shift) / A
        if not -1 <= normalized <= 1:
            raise ValueError("Target power is outside sine range")
        angle = np.arcsin(normalized)
        angle = (angle - phase) / freq
        return np.degrees(angle) if degrees else angle




# Service classes

In [3]:
class MeasurementService:
    def __init__(self, picoharp_device, powermeter, kinesys, shutter):
        self.picoharp = picoharp_device
        self.powermeter = powermeter
        self.kinesys = kinesys
        self.shutter = shutter

    def measure_time_dependent(self, tacq, duration_s, filename, rotation_flag):
        with open(f"{filename}_{tacq}ms_{rotation_flag}rotation.txt", 'w') as f:
            phlib = ctypes.CDLL("phlib64.dll")
            start_time = time.time()
            end_time = start_time + duration_s
            t_open = start_time + 7
            countrate = []
            intensities = []
            times = []
            powers = []
            times_power = []
            j_flag = True
            i = 0
            while time.time() < end_time:
                if time.time() > t_open and rotation_flag == 0 and j_flag:
                    self.shutter.open()
                    j_flag = False
                s = time.time()
                self.picoharp.tryfunc(phlib.PH_GetCountRate(ctypes.c_int(self.picoharp.dev[0]),
                                                            ctypes.c_int(1),
                                                            ctypes.byref(self.picoharp.countRate1)),
                                     "GetCountRate")
                countrate.append(self.picoharp.countRate1.value)
                intensities.append(self.picoharp.Measurement(tacq))
                e = time.time()
                times.append(e - start_time)
                powers.append(self.powermeter.get_power())
                times_power.append(time.time() - start_time)
                f.write(f"Intensity: {intensities[i]:.1f}, Countrate: {countrate[i]:.1f}, Power: {powers[i]}, "
                        f"Start Time: {s}, End Time: {e}\n")
                i += 1
            return intensities, times, countrate, powers, times_power

    def plot_results(self, times, intensities, countrate, powers, times_power, save_path):
        fig, axs = plt.subplots(5, figsize=(10, 15))
        axs[0].plot(times, intensities, '-o')
        axs[0].set_xlabel('Time (s)')
        axs[0].set_ylabel('Integral intensity (counts)')
        axs[1].plot(times, countrate, '-o')
        axs[1].set_xlabel('Time (s)')
        axs[1].set_ylabel('Count rate (counts/s)')
        axs[2].plot(times_power, powers, '-o')
        axs[2].set_xlabel('Time (s)')
        axs[2].set_ylabel('Power (W)')
        axs[3].plot(powers, countrate, '-o')
        axs[3].set_xlabel('Excitation power (W)')
        axs[3].set_ylabel('SHG power (W)')
        powers_interp = np.interp(np.array(times) - 0.14, np.array(times), np.array(powers))
        axs[4].plot(powers_interp, countrate, '-o')
        axs[4].set_xlabel('Excitation power (W)')
        axs[4].set_ylabel('SHG power (W)')
        plt.tight_layout()
        plt.savefig(save_path)
        plt.show()


class NotificationService:
    @staticmethod
    def sound_notification(frequency=1000, duration_ms=1000):
        try:
            import winsound
            winsound.Beep(frequency, duration_ms)
        except ImportError:
            print("Sound notification not supported on this platform")


class VisualizationService:
    @staticmethod
    def plot_2d_color_map(data, cmap='viridis', title='2D Color Map'):
        plt.figure(figsize=(8, 6))
        cax = plt.imshow(data, cmap=cmap, aspect='auto')
        plt.colorbar(cax)
        plt.title(title)
        plt.xlabel('X-axis')
        plt.ylabel('Y-axis')
        plt.show()

