In [None]:
import os
import warnings

class Usbtmc:
    """Simple implementation of a USBTMC device driver.
    
    >> The USBTMC (USB Test and Measurement Class) and USBTMC-USB488 are two related USB (Universal Serial Bus) class specifications. They were officially adopted on December 22, 2002. The purpose of USBTMC is to describe the requirements for test-and-measurement devices with a USB interface. USBTMC defines the protocol for exchanging messages between hosts and devices.
      
    https://www.eetimes.com/usbtmc-unwrapped/#
    """ 
    def __init__(self, device: [str]="/dev/usbtmc0"):
        self.device = device
        self.channels = list()
        self.reset()

    @property
    def __file__(self):
        os_open_configuration = {
            "path": self.device,
            "flags": os.O_RDWR | os.O_DIRECT | os.O_SYNC 
        }
        __file = os.open(
            self.device,
            os.O_RDWR
        )
        return __file
        # TODO: Test that the__file__ opened
 
    def write(self, command: [bytes, str], response=None):
        """write a string to the usbtmc device.
        
        Example:
        
        """
        if isinstance(command, str):
            command = command.encode()
        os.write(self.__file__, command);
        if response is not None:
            return self.read(self.response)
 
    def read(self, length = 1024):
        return os.read(self.__file__, length).decode()
 
    def name(self):
        """# *IDN?
        Command Format:
            *IDN?
        Function Explanation:
            The command queries the ID character string of the instrument, including a field
            separated by 4 commas: manufactory, model, serial number and the version
            number composed of numbers and separated by “.” .
        Returned Format:
            RIGOL TECHNOLOGIES,<model>,<serial number>, <software version>.
        Example:
            RIGOL TECHNOLOGIES,DS1102E,DS1EB104702974,00.02.01.01.00
        """
        self.write("*IDN?")
        return self.read(300)
 
    def reset(self):
        """The command resets the system parameters."""
        warnings.warn(f"Resetting: {self.name()}")
        self.write("*RST")
        
 
    def __repr__(self):
        return f"{self.__class__.__name__}<{self.device}>"
    
    def __enter__(self):
        print("in __enter__")
        return self
    def __exit__(self, exception_type, exception_value, traceback):
        print("in __exit__")       

In [2]:
device = Usbtmc()
device



Usbtmc</dev/usbtmc0>

In [3]:
device.name()

'Rigol Technologies,DS1052D,DS1EU152500705,00.04.02.01.00'

In [4]:
device.reset()



In [5]:
from cached_property import cached_property
class RigolScope:
    """Class to control a Rigol DS1000 series oscilloscope"""
    def __init__(self, *args, device: [Usbtmc, str]="/dev/usbtmc0"):
        if len(args)>0:
            raise Exception("No.")
        if isinstance(device, str):
            device = Usbtmc(device)
        self.device = device
 
    def write(self, command):
        """Send an arbitrary command directly to the scope"""
        self.device.write(command)
 
    def read(self, command):
        """Read an arbitrary amount of data directly from the scope"""
        return self.device.read(command)
 
    def reset(self):
        """Reset the instrument"""
        self.device.reset()
        
    @cached_property
    def __idn__(self):
        return self.device.name()
    
    @property
    def model(self):
        return self.__idn__.split(",")[1]
    
    @property
    def vendor(self):
        return self.__idn__.split(",")[0]
    
    @property
    def serial(self):
        return self.__idn__.split(",")[2]
    
    @property
    def version(self):
        return self.__idn__.split(",")[3]
    
    def __repr__(self):
        return f"{self.__class__.__name__}<{self.model}>"
    
    def __enter__(self):
        print("in __enter__")
        return self
    def __exit__(self, exception_type, exception_value, traceback):
        print("in __exit__")

r = RigolScope()
r



RigolScope<DS1052D>

In [6]:
device = r.device

In [7]:
self = scope = r

In [8]:
scope.device

Usbtmc</dev/usbtmc0>

In [9]:
assert scope.vendor=="Rigol Technologies"
assert scope.model=="DS1052D"
assert scope.version=="00.04.02.01.00"

In [None]:
imp

In [10]:
with open("../python_Rigol/DS1000DE_ProgrammingGuide_EN.txt", "r") as file:
    DS1000DE_ProgrammingGuide_EN = file.read()
command_re = re.compile("(:SYSTEM:[\w]+)")
for result in command_re.findall(DS1000DE_ProgrammingGuide_EN):
    if result.startswith(":SYSTEM:"):
        print(result)

NameError: name 're' is not defined

In [None]:
class CommandsFactory:
    def __init__(self, *args, **kwargs):
        if len(args)>0:
            raise Exception("No.")
            
        for key, value in kwargs.items():
            setattr(self, key, value)
    
    def __str__(self):
        return f"{self.__class__.__name__.upper()}"

In [11]:
System=type("System", (CommandsFactory, ), {})

NameError: name 'CommandsFactory' is not defined

In [None]:
def key_factory(key_str:str, wait: int=0):
    def press_key(self):
        command_string = f":{self}:{key_str}"
        print(command_string)
        self.device.write(f":{self}:{key_str}")
    return press_key
                          
                          
def key_factory2(key_str:str, wait: int=0):
    key_str = key_str.strip().strip(":").split(":")[-1].upper()
    key_function = key_factory(key_str, wait)                   
    return key_str.lower(), key_function

In [None]:
def key_factory3(subsystem, command):
    return f":{subsystem.upper()}:{command.upper()}"

In [None]:
key_factory3("SYSTEM", "RUN")

In [None]:
key_factory2(key_factory3("SYSTEM", "RUN"))

In [None]:
subsystem_commands = {
    "System": ["RUN", "STOP", "AUTO", "HARDCOPY"]
}
for subsystem, subsystem_commands in subsystem_commands.items():
    print(f"{subsystem}")
    print(f"{subsystem_commands}")
    command_dictionary = dict()
    for command in subsystem_commands:
        subsystem_command, subsystem_command_function = key_factory2(key_factory3(subsystem, command))
        command_dictionary[subsystem_command]=subsystem_command_function
    subsystem_class=type(subsystem, (CommandsFactory, ), command_dictionary)
    setattr(RigolScope, subsystem.lower(), subsystem_class())


In [None]:
f = RigolScope.system

In [None]:
f

In [None]:
scope.device

In [None]:
setattr(scope.system, "device", scope.device)

In [None]:
scope.system.run()

In [None]:
class CommandsFactory:
    def __init__(self, *args, **kwargs):
        if len(args)>0:
            raise Exception("No.")
            
        for key, value in kwargs.items():
            setattr(self, key, value)
    
    def __str__(self):
        return f"{self.__class__.__name__.upper()}"

In [None]:
ch

In [None]:
subsystem_commands = {
    "System": ["RUN", "STOP", "AUTO", "HARDCOPY"]
}
for subsystem, subsystem_commands in subsystem_commands.items():
    print(f"{subsystem}")
    print(f"{subsystem_commands}")
    command_dictionary = dict()
    for command in subsystem_commands:
        subsystem_command, subsystem_command_function = key_factory2(key_factory3(subsystem, command))
        command_dictionary[subsystem_command]=subsystem_command_function
    subsystem_class=type(subsystem, (CommandsFactory, ), command_dictionary)
    setattr(RigolScope, subsystem.lower(), subsystem_class())


In [None]:
f.run()

In [None]:
f.stop()

In [None]:
scope.system.auto()

In [None]:
scope.system.start()

In [None]:
command_dictionary

In [None]:
RigolScope.SYSTem.RU

In [None]:
def 

In [None]:
measurement.vpp()

In [None]:
command_re = re.compile(":([^?]+)\? \[<source")
for result in command_re.findall(DS1000DE_ProgrammingGuide_EN):
    result = result.strip()
    if result.startswith(":MEASure:"):
        attr_name, attr_get_function=measurement_factory2(result)
        setattr(Measure, attr_name, property(attr_get_function))

In [None]:
measurement.vpp

In [None]:
channel.measurement.vpp

In [None]:
measurement_factory(":MEASure:VPP?")

In [None]:
measurement_factory(":MEASure:VPP?")

In [None]:
measurement_factory2(":MEASure:VPP")

In [None]:
setattr(Measure, )

In [None]:
with open("../python_Rigol/DS1000DE_ProgrammingGuide_EN.txt", "r") as file:
    DS1000DE_ProgrammingGuide_EN = file.read()

In [None]:
import re

In [None]:
me

In [None]:
command_re = re.compile(":([^?]+)\? \[<source")
for result in command_re.findall(DS1000DE_ProgrammingGuide_EN):
    result = result.strip()
    if result.startswith(":MEASure:"):
        attr_name, attr_get_function=measurement_factory2(result)
        setattr(Measure, attr_name, attr_get_function)

In [None]:
measurement.vpp()

In [None]:
setattr(Channel, "vpp", measurement_factory(":MEASure:VPP?"))


In [None]:
c.device.write("ac")
float(c.device.read())

In [None]:
result

In [None]:
print(DS1000DE_ProgrammingGuide_EN)

# System


In [None]:
import re

In [None]:
command_re = re.compile("(:SYSTEM:[\w]+)")
for result in command_re.findall(DS1000DE_ProgrammingGuide_EN):
    if result.startswith(":SYSTEM"):
        print(result)

# Keys

In [None]:
command_re = re.compile(":([^?]+)\? \[<source")
for result in command_re.findall(DS1000DE_ProgrammingGuide_EN):
    result = result.strip()
    if result.startswith(":MEASure:"):
        attr_name, attr_get_function=measurement_factory2(result)
        setattr(Measure, attr_name, attr_get_function)

In [None]:
class CommandsFactory:
    def __init__(self):
        pass
    
    def __str__(self):
        return f"{self.__class__.__name__.upper()}"

In [None]:
Key=type("Key", (CommandsFactory, ), {})

In [None]:
key = Key()

In [None]:
f"{key}"

In [None]:
command_re = re.compile("(:KEY:[\w]+)")
for result in command_re.findall(DS1000DE_ProgrammingGuide_EN):
    if result.startswith(":KEY"):
        print(result)
        break

In [None]:
x = lambda y, z: print(y, z)

In [None]:
setattr(Key, "foo", x)

In [None]:
key.foo

In [None]:
key.foo(4)

In [None]:
x = lambda subsystem, command: print(f"{subsystem}:{command}")
zip

In [None]:
def key_factory

In [None]:
def measurement_factory(query_string:str, length:int=1024):
    query_string=str(query_string)
    query_string=query_string.strip()
    if not query_string.endswith("?"):
        query_string=f"{query_string}?"
    def get_measurement(self):
        self.device.write(f"{query_string} {self.channel}")
        response = self.device.read(length=length)
        try:
            return float(response)
        except:
            return response
        
    return get_measurement

def measurement_factory2(query_string:str, length:int=1024):
    attr_name = query_string.strip("?").split(":")[-1].lower()
    return attr_name, measurement_factory(query_string, length)
