In [2]:
import pyvisa as pyv
import csv
import struct
import os

class WaveformGenerator:
    def __init__(self, resource_string: str):
        self.rm = pyv.ResourceManager()
        self.device = self.rm.open_resource(resource_string)
        response = self.device.query('*IDN?')
        print("Instrument ID:", response)

    def send_command(self, command: str) -> None:
        print(f"Sending command: {command}")
        self.device.write(command)

    def query_command(self, command: str) -> str:
        print(f"Querying command: {command}")
        return self.device.query(command)

    def set_waveform_type(self, channel: int, waveform_type: str) -> None:
        self.send_command(f'C{channel}:BSWV WVTP,{waveform_type}')

    def set_frequency(self, channel: int, frequency: float) -> None:
        self.send_command(f'C{channel}:BSWV FRQ,{frequency}')

    def set_amplitude(self, channel: int, amplitude: float) -> None:
        self.send_command(f'C{channel}:BSWV AMP,{amplitude}')

    def set_phase(self, channel: int, phase: float) -> None:
        self.send_command(f'C{channel}:BSWV PHSE,{phase}')

    def start_waveform(self, channel: int) -> None:
        self.send_command(f'C{channel}:OUTP ON')

    def stop_waveform(self, channel: int) -> None:
        self.send_command(f'C{channel}:OUTP OFF')

    def set_arbitrary_waveform_by_name(self, channel: int, name: str) -> None:
        self.send_command(f'C{channel}:ARWV NAME,{name}')

    def query_arbitrary_waveform(self, channel: int) -> str:
        return self.query_command(f'C{channel}:ARWV?')

    def convert_csv_to_binary(self, csv_filename: str, bin_filename: str) -> None:
        with open(csv_filename, 'r') as csvfile, open(bin_filename, 'wb') as binfile:
            reader = csv.reader(csvfile)
            next(reader)  # Skip the header row
            for row in reader:
                float_row = list(map(float, row))
                bin_row = struct.pack('f' * len(float_row), *float_row)
                binfile.write(bin_row)
        print(f"Converted {csv_filename} to {bin_filename}")

    def save_binary_waveform_to_device(self, bin_filename: str) -> None:
        waveform_name = os.path.splitext(os.path.basename(bin_filename))[0]  # Use filename without extension as the waveform name
        with open(bin_filename, 'rb') as binfile:
            data = binfile.read()
            self.device.write_binary_values(f'C1:WVDT WVNM,{waveform_name},WAVEDATA,', data, datatype='B')
        print(f"Binary waveform {bin_filename} saved to device as {waveform_name}.")

    def upload_and_generate_waveform(self, channel: int, csv_filename: str):
        bin_filename = os.path.splitext(csv_filename)[0] + '.bin'
        self.convert_csv_to_binary(csv_filename, bin_filename)
        self.save_binary_waveform_to_device(bin_filename)
        waveform_name = os.path.splitext(os.path.basename(bin_filename))[0]  # Use filename without extension as the waveform name
        self.set_arbitrary_waveform_by_name(channel, waveform_name)
        self.start_waveform(channel)
        print(f"Waveform from {csv_filename} is now being generated on channel {channel}")

def get_user_input(prompt: str, valid_options=None):
    while True:
        value = input(prompt).strip()
        if not valid_options or value in valid_options:
            return value
        print(f"Invalid input. Valid options are: {', '.join(valid_options)}")

def waveform_config(ip_string: str = None):
    if ip_string is None:
        connection_type = get_user_input("Enter connection type (IP/USB): ", ["IP", "USB"]).upper()
        ip_address = input("Enter IP address: ").strip()
        if connection_type == "IP":
            ip_string = f"TCPIP::{ip_address}::INSTR"
        elif connection_type == "USB":
            ip_string = f"USB::{ip_address}::INSTR"
        else:
            print("Invalid connection type.")
            return

    generator = WaveformGenerator(ip_string)

    while True:
        try:
            channel = int(get_user_input("Enter the channel number (1/2): ", ["1", "2"]))
            if get_user_input("Do you want to upload a file? (yes/no): ", ["yes", "no"]) == "yes":
                csv_filename = input("Enter the CSV filename: ").strip()
                generator.upload_and_generate_waveform(channel, csv_filename)
            else:
                waveform_type = get_user_input("Enter a waveform type (sine, square, ramp, pulse, noise, DC, PRBS, arb): ").upper()
                if waveform_type == "ARB":
                    name = input("Enter the name of the arbitrary waveform: ")
                    generator.set_arbitrary_waveform_by_name(channel, name)
                    print(f"Current waveform: {generator.query_arbitrary_waveform(channel)}")
                else:
                    generator.set_waveform_type(channel, waveform_type)
                    generator.set_frequency(channel, float(input("Set frequency in Hz: ")))
                    generator.set_amplitude(channel, float(input("Set amplitude: ")))
                    print("Starting waveform")
                    generator.start_waveform(channel)

            if get_user_input("Do you want to stop the waveform output? (yes/no): ", ["yes", "no"]) == "yes":
                generator.stop_waveform(1)
                generator.stop_waveform(2)
                break

            if get_user_input("Do you want to reconfigure, or use another channel? (yes/no): ", ["yes", "no"]) != "yes":
                break

        except ValueError as e:
            print(f"Invalid input or error: {e}")

if __name__ == "__main__":
    waveform_config()


Instrument ID: Siglent Technologies,SDG6052X,SDG6XFCQ6R0831,6.01.01.36R3

Invalid input. Valid options are: yes, no
Invalid input. Valid options are: yes, no
Invalid input. Valid options are: yes, no


KeyboardInterrupt: Interrupted by user