Skip to content

Commit

Permalink
Yokogawa 6370 expansion (#353)
Browse files Browse the repository at this point in the history
* Yokogawa 6370 expanded, tests added accordingly.

* Allow to retrieve just a limited subset of a trace's data.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Data and wavelength unified.

Tests expanded.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Changes from review: documentation and tests.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
BenediktBurger and pre-commit-ci[bot] committed Nov 21, 2022
1 parent e583cad commit 00fb219
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 22 deletions.
122 changes: 102 additions & 20 deletions src/instruments/yokogawa/yokogawa6370.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
unitless_property,
bounded_unitful_property,
ProxyList,
string_property,
)


Expand All @@ -27,9 +28,7 @@ class Yokogawa6370(OpticalSpectrumAnalyzer):

"""
The Yokogawa 6370 is a optical spectrum analyzer.
Example usage:
>>> import instruments as ik
>>> import instruments.units as u
>>> inst = ik.yokogawa.Yokogawa6370.open_visa('TCPIP0:192.168.0.35')
Expand All @@ -47,9 +46,7 @@ class Channel(OpticalSpectrumAnalyzer.Channel):

"""
Class representing the channels on the Yokogawa 6370.
This class inherits from `OpticalSpectrumAnalyzer.Channel`.
.. warning:: This class should NOT be manually created by the user. It
is designed to be initialized by the `Yokogawa6370` class.
"""
Expand All @@ -60,19 +57,44 @@ def __init__(self, parent, idx):

# METHODS #

def data(self, bin_format=True):
cmd = f":TRAC:Y? {self._name}"
def _data(self, axis, limits=None, bin_format=True):
"""Get data of `axis`.
:param axis: Axis to get the data of, "X" or "Y"
:param limits: Range of samples to transfer as a tuple of min and
max value, e.g. (5, 100) transfers data from the fifth to the
100th sample. The possible values are from 0 to 50000.
"""
if limits is None:
cmd = f":TRAC:{axis}? {self._name}"
elif isinstance(limits, (tuple, list)) and len(limits) == 2:
cmd = f":TRAC:{axis}? {self._name},{limits[0]+1},{limits[1]+1}"
else:
raise ValueError("limits has to be a list or tuple with two members")
self._parent.sendcmd(cmd)
data = self._parent.binblockread(data_width=8, fmt="<d")
self._parent._file.read_raw(1) # pylint: disable=protected-access
return data

def wavelength(self, bin_format=True):
cmd = f":TRAC:X? {self._name}"
self._parent.sendcmd(cmd)
data = self._parent.binblockread(data_width=8, fmt="<d")
self._parent._file.read_raw(1) # pylint: disable=protected-access
return data
def data(self, limits=None, bin_format=True):
"""
Return the trace's level data.
:param limits: Range of samples to transfer as a tuple of min and
max value, e.g. (5, 100) transfers data from the fifth to the
100th sample. The possible values are from 0 to 50000.
"""
return self._data("Y", limits=limits, bin_format=bin_format)

def wavelength(self, limits=None, bin_format=True):
"""
Return the trace's wavelength data.
:param limits: Range of samples to transfer as a tuple of min and
max value, e.g. (5, 100) transfers data from the fifth to the
100th sample. The possible values are from 0 to 50000.
"""
return self._data("X", limits=limits, bin_format=bin_format)

# ENUMS #

Expand Down Expand Up @@ -100,20 +122,56 @@ class Traces(Enum):

# PROPERTIES #

# General

id = string_property(
"*IDN",
doc="""
Get the identification of the device.
Output: 'Manufacturer,Product,SerialNumber,FirmwareVersion'
Sample: 'YOKOGAWA,AQ6370D,90Y403996,02.08'
""",
readonly=True,
)

status = unitless_property(
"*STB",
doc="""The status byte of the device.
Bit 7: Summary bit of operation status
Bit 5: Summary bit of standard event status register
Bit 4: “1” if the output buffer contains data
Bit 3: Summary bit of questionable status
""",
readonly=True,
)

operation_event = unitless_property(
":status:operation:event",
doc="""
All changes after the last readout. Readout clears the operation_event
Bit 4: Autosweep
Bit 3: Calibration/Alignment
Bit 2: Copy/File
Bit 1: Program
Bit 0: Sweep finished.
""",
readonly=True,
)

@property
def channel(self):
"""
Gets the specific channel object.
This channel is accessed as a list in the following manner::
>>> import instruments as ik
>>> osa = ik.yokogawa.Yokogawa6370.open_gpibusb('/dev/ttyUSB0')
>>> dat = osa.channel["A"].data # Gets the data of channel 0
:rtype: `list`[`~Yokogawa6370.Channel`]
"""
return ProxyList(self, Yokogawa6370.Channel, Yokogawa6370.Traces)

# Sweep

start_wl, start_wl_min, start_wl_max = bounded_unitful_property(
":SENS:WAV:STAR",
u.meter,
Expand Down Expand Up @@ -172,6 +230,10 @@ def channel(self):
Effective only after a self.start_sweep().""",
)

# Analysis

# Traces

active_trace = enum_property(
":TRAC:ACTIVE",
Traces,
Expand All @@ -182,22 +244,42 @@ def channel(self):

# METHODS #

def data(self):
def data(self, limits=None):
"""
Function to query the active Trace data of the OSA.
:param limits: Range of samples to transfer as a tuple of min and
max value, e.g. (5, 100) transfers data from the fifth to the
100th sample. The possible values are from 0 to 50000.
"""
return self.channel[self.active_trace].data()
return self.channel[self.active_trace].data(limits=limits)

def wavelength(self):
def wavelength(self, limits=None):
"""
Query the wavelength axis of the active trace.
:param limits: Range of samples to transfer as a tuple of min and
max value, e.g. (5, 100) transfers data from the fifth to the
100th sample. The possible values are from 0 to 50000.
"""
return self.channel[self.active_trace].wavelength()
return self.channel[self.active_trace].wavelength(limits=limits)

def analysis(self):
"""Get the analysis data."""
return [float(x) for x in self.query(":CALC:DATA?").split(",")]

def start_sweep(self):
"""
Triggering function for the Yokogawa 6370.
After changing the sweep mode, the device needs to be triggered before it will update.
After changing the sweep mode, the device needs to be triggered before
it will update.
"""
self.sendcmd("*CLS;:init")

def abort(self):
"""Abort a running sweep or calibration etc."""
self.sendcmd(":ABORT")

def clear(self):
"""Clear status registers."""
self.sendcmd("*CLS")
123 changes: 121 additions & 2 deletions tests/test_yokogawa/test_yokogawa_6370.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from tests import (
expected_protocol,
iterable_eq,
pytest,
)
from instruments.units import ureg as u

Expand All @@ -35,6 +36,88 @@ def test_init():
pass


def test_id():
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[":FORMat:DATA REAL,64", "*IDN?"],
["'YOKOGAWA,AQ6370D,x,02.08'"],
) as inst:
assert inst.id == "YOKOGAWA,AQ6370D,x,02.08"


def test_status():
with expected_protocol(
ik.yokogawa.Yokogawa6370, [":FORMat:DATA REAL,64", "*STB?"], ["7"]
) as inst:
assert inst.status == 7


def test_operation_event():
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[":FORMat:DATA REAL,64", ":status:operation:event?"],
["7"],
) as inst:
assert inst.operation_event == 7


@pytest.mark.parametrize("axis", ("X", "Y"))
@given(
values=st.lists(st.floats(allow_infinity=False, allow_nan=False), min_size=1),
channel=st.sampled_from(ik.yokogawa.Yokogawa6370.Traces),
)
def test_channel_private_data_wo_limits(values, channel, axis):
values_packed = b"".join(struct.pack("<d", value) for value in values)
values_len = str(len(values_packed)).encode()
values_len_of_len = str(len(values_len)).encode()
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[
":FORMat:DATA REAL,64",
f":TRAC:{axis}? {channel.value}",
],
[b"#" + values_len_of_len + values_len + values_packed],
) as inst:
values = tuple(values)
if numpy:
values = numpy.array(values, dtype="<d")
iterable_eq(inst.channel[channel]._data(axis), values)


@pytest.mark.parametrize("axis", ("X", "Y"))
@given(
values=st.lists(st.floats(allow_infinity=False, allow_nan=False), min_size=1),
channel=st.sampled_from(ik.yokogawa.Yokogawa6370.Traces),
start=st.integers(0, 25000),
length=st.integers(0, 25000),
)
def test_channel_private_data_with_limits(values, channel, axis, start, length):
values_packed = b"".join(struct.pack("<d", value) for value in values)
values_len = str(len(values_packed)).encode()
values_len_of_len = str(len(values_len)).encode()
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[
":FORMat:DATA REAL,64",
f":TRAC:{axis}? {channel.value},{start+1},{start+1+length}",
],
[b"#" + values_len_of_len + values_len + values_packed],
) as inst:
values = tuple(values)
if numpy:
values = numpy.array(values, dtype="<d")
iterable_eq(inst.channel[channel]._data(axis, (start, start + length)), values)


@pytest.mark.parametrize("limits", ([5], "abc", (7,), 3))
def test_channel_private_data_limit_error(limits):
with expected_protocol(
ik.yokogawa.Yokogawa6370, [":FORMat:DATA REAL,64"], []
) as inst:
with pytest.raises(ValueError):
inst.channel["A"]._data("X", limits)


@given(
values=st.lists(st.floats(allow_infinity=False, allow_nan=False), min_size=1),
channel=st.sampled_from(ik.yokogawa.Yokogawa6370.Traces),
Expand Down Expand Up @@ -69,14 +152,14 @@ def test_channel_wavelength(values, channel):
ik.yokogawa.Yokogawa6370,
[
":FORMat:DATA REAL,64",
f":TRAC:X? {channel.value}",
f":TRAC:X? {channel.value},1,501",
],
[b"#" + values_len_of_len + values_len + values_packed],
) as inst:
values = tuple(values)
if numpy:
values = numpy.array(values, dtype="<d")
iterable_eq(inst.channel[channel].wavelength(), values)
iterable_eq(inst.channel[channel].wavelength((0, 500)), values)


@given(value=st.floats(min_value=600e-9, max_value=1700e-9))
Expand Down Expand Up @@ -263,3 +346,39 @@ def test_start_sweep():
[],
) as inst:
inst.start_sweep()


def test_analysis():
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[
":FORMat:DATA REAL,64",
":CALC:DATA?",
],
["1,2,3,7.3,3.12314,.2345"],
) as inst:
assert inst.analysis() == [1, 2, 3, 7.3, 3.12314, 0.2345]


def test_abort():
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[
":FORMat:DATA REAL,64",
":ABORT",
],
[],
) as inst:
inst.abort()


def test_clear():
with expected_protocol(
ik.yokogawa.Yokogawa6370,
[
":FORMat:DATA REAL,64",
"*CLS",
],
[],
) as inst:
inst.clear()

0 comments on commit 00fb219

Please sign in to comment.