In [36]:
!pip install pyvisa pandas pyvisa-py  > /dev/null
!pip install ipywidgets > /dev/null

# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import math
import os
import re
import numpy as np
import pyvisa
import time
import pickle
import pandas as pd
from IPython.display import display, HTML
%matplotlib widget

base_path = "."

class GobalState:
    def __init__(self, state_file = 'state.pkl', default={}):
        self.state_file = state_file
        self.state = default
        self.loaded = False
    
    def __del__(self):
        if self.loaded:
            self.save()
            
    def load(self):
        if not os.path.exists(self.state_file):
            self.save()
        pkl_file = open(self.state_file, "rb")
        self.state = pickle.load(pkl_file)
        self.loaded = True
        pkl_file.close()
    
    def save(self):
        pkl_file = open(self.state_file, "wb")
        pickle.dump(self.state, pkl_file)
        pkl_file.close()
    
    def get_or_set(self, name, default=None):
        value = self.get(name)
        if value:
            return value
        return self.set(name, default)
        
    def get(self, name, default=None):
        if not self.loaded:
            self.load()
        return self.state.get(name, default)

    def unset(self, name):
        if not self.loaded:
            self.load()
        if name in self.state:
            del self.state[name]

    def set(self, name, value):
        if not self.loaded:
            self.load()
        self.state[name] = value
        return value 

global_state = GobalState()

class LogRecorder():
    def __init__(self, base_path, ip):
        self.base_path = base_path
        self.ip = ip
        self.dmm = None
        self.running = False
        self._csv_file = None
        
    def __del__(self):
       self.close_csv_file()
 
    def play_sound(self):
        if os.path.exists("/System/Library/Sounds/Glass.aiff"):
            os.system("afplay /System/Library/Sounds/Glass.aiff")

    def close_csv_file(self):
        if self._csv_file:
            self._csv_file.close()
            self._csv_file = None

    def open_csv_file(self, prefix="", postfix=""):
        if not self._csv_file:
            filename = self.find_next_log_filename(postfix, prefix)
            display("Create CSV file: " + filename)
            self._csv_file = open(filename, 'w')

    def write_csv(self, row):
        if not self._csv_file:
            self.open_csv_file()
            
        for line in row:          
            self._csv_file.write(f"{line}\r\n")
  
    def find_next_log_filename(self, postfix="", prefix="LOG"):
        max_index = 0
        today = datetime.now().strftime("%d.%m.%Y")
        folder_name = os.path.join(self.base_path, today, "JUPYTER")
        if not os.path.exists(folder_name):
            os.makedirs(folder_name, exist_ok=True)
        pattern = rf"{prefix}(\d{{5}})(.*?)\.csv"
        for fname in os.listdir(folder_name):
            match = re.match(pattern, fname)
            print(f"Checking file: {fname}, pattern: {pattern}, match: {match}")
            if match:
                idx = int(match.group(1))
                if idx >= max_index:
                    max_index = idx + 1
        return os.path.join(folder_name, '{:s}{:05d}{:s}.csv'.format(str(prefix), max_index, str(postfix)))
    
    # https://www.testunlimited.com/pdf/an/5990-3515en.pdf
    # 0.006 PLC | 6.0 ppm x Range | MAX  (for 34410A)
    # 0.02 PLC | 3.0 ppm x Range
    # 0.06 PLC | 1.5 ppm x Range |
    # 0.2 PLC | 0.7 ppm x Range |
    # 1 PLC (default) | 0.3 ppm x Range | DEF
    # 2 PLC | 0.2 ppm x Range |
    # 10 PLC | 0.1 ppm x Range |
    # 100 PLC | 0.03 ppm x Range | MIN

    def start(self, config, plc, range, duration, chunk_size, progress=None):
        rm = pyvisa.ResourceManager()
        t_start = time.time()
        t_start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t_start))
        try:
            self.open_csv_file(prefix="DMM") 
            self.dmm = rm.open_resource(f"TCPIP::{self.ip}::INSTR")
            self.dmm.timeout = 60000
            display(f"Підключено: {self.ip}, {self.dmm.query("*IDN?")}")
            self.dmm.write("*RST")
            self.dmm.write("*CLS")
            # self.dmm.write("DISP OFF")
            self.dmm.write(f"CONF:{config} {range}")
                  
            self.dmm.write(f"SENS:{config}:RES MAX")
            if range == "AUTO":
                self.dmm.write(f"SENS:{config}:RANGE:AUTO ON")
            else:
                self.dmm.write(f"SENS:{config}:RANGE:AUTO OFF")
                self.dmm.write(f"SENS:{config}:RANGE {range}")

            self.dmm.write(f"SENS:{config}:ZERO:AUTO OFF")
            self.dmm.write(f"SENS:{config}:APER:ENAB OFF")
            self.dmm.write(f"SENS:{config}:NPLC {plc}") # MIN 0.006
            self.dmm.write(f"DATA:POIN:EVEN:THR {chunk_size}")
            self.dmm.write(f"SAMP:COUN {chunk_size}")
            self.dmm.write(f"SAMP:SOUR IMM")
            self.dmm.write(f"TRIG:SOUR BUS")
            self.dmm.write(f"TRIG:DEL 0")
            self.dmm.write(f"TRIG:COUN 1")
            self.dmm.write(f"FORM:DATA REAL,32")
            self.running = True
            header_ready = False
            start_time = time.time()
            total_lines = 0
            total_chunks = 0
            total_sample_rate = 0
            time.sleep(0.5)
            display(f"Waiting for reading. It may take a while...")
            while self.running:
                chunk_measurement_start_time = time.time()
                self.dmm.write("INIT")
                self.dmm.write("*TRG")
                self.dmm.query("*OPC?")
                chunk_measurement_time = time.time() - chunk_measurement_start_time
                sample_rate = round(chunk_size/chunk_measurement_time)
                if not header_ready:
                    header_ready = True
                    meta = f"config={config}|plc={plc}|range={range}|duration={duration}|chunk_size={chunk_size}|sample_rate={sample_rate}"
                    self.write_csv([meta])
                    display(meta)

                row = self.dmm.query_binary_values("R?", datatype='f', is_big_endian=True)
                row = [f"{value:.8f}" for value in row if value is not None]
                self.write_csv(row)
                total_lines += len(row)
                total_chunks += 1
                total_sample_rate += sample_rate
                
                if progress:
                    progress.max = duration
                    progress.value = int(time.time() - start_time)
                
                if time.time() - start_time >= duration:
                    display("Time is up, stopping...")
                    display(f"Total lines written: {total_lines}, duration: {time.time() - start_time:.2f} sec, chunks: {total_chunks}, sample rate: {total_sample_rate/total_chunks} records/sec")
                    self.running = False
        finally:
            time.sleep(2)
            if self.dmm:
                self.dmm.write("DISP ON")
                self.dmm.write("*RST")
                self.dmm = None
            self.close_csv_file()
            self.running = False    
        
        self.play_sound()
       
    def parse_definite_length_block(self, response_bytes):
        """
        Розпарсити відповідь від dmm на команду R?
        response_bytes: bytes – байтовий буфер з DMM (наприклад, dmm.read_raw())
        """
        if response_bytes[0] != ord(b'#'):
            raise ValueError("Неправильний формат: немає префіксу '#'")

        n_digits = int(chr(response_bytes[1]))
        length_field = int(response_bytes[2:2 + n_digits].decode())

        start = 2 + n_digits
        data_block = response_bytes[start:start + length_field]

        if length_field % 8 != 0:
            raise ValueError("Невідповідність: довжина не кратна 64-бітному float")

        num_values = length_field // 8
        values = np.frombuffer(data_block, dtype='>f8')  # '>f8' – big-endian float64
        return values
        
    def stop(self, btn=None):
        self.running = False 
        


In [37]:
import threading
import ipywidgets as widgets

global global_i
global_i = 0
def start_record_log(mes_options, plc, range, duration_sec, chunk_size, ip_address):
    stop_button = widgets.Button(description="Stop")
    progress = widgets.IntProgress(
        value=0,
        min=0,
        max=duration_sec,
        description='Запис:',
        bar_style='success', # 'success', 'info', 'warning', 'danger' or ''
        style={'bar_color': 'maroon'},
        orientation='horizontal'
    )
    log_recorder = LogRecorder(base_path, ip=ip_address)
    thread = threading.Thread(target=log_recorder.start, args=(mes_options, plc, range, duration_sec, chunk_size, progress))
    stop_button.on_click(log_recorder.stop)
    display(widgets.HBox([progress, stop_button]))
    thread.start()

def log_recorder():
    ip_address =  widgets.Text(description="IP", value=global_state.get('ip_address', "192.168.1.202"));
    ip_address.observe(lambda change: global_state.set('ip_address', ip_address.value))

    mes_options =  widgets.Dropdown(
        options=[
            ('DC Voltage', 'VOLT:DC'),
            ('DC Current', 'CURR:DC'),
            ('AC Voltage', 'VOLT:AC'),
            ('AC Current', 'CURR:AC'),
        ],
        value=global_state.get('mes_options', 'VOLT:DC'),
        description='Параметр:',
    )
    mes_options.observe(lambda change: global_state.set('mes_options', mes_options.value))

    plc =  widgets.Dropdown(
        options=[
            ('0.006 PLC (MAX)', '0.006'),
            ('0.02 PLC', '0.02'),
            ('0.06 PLC', '0.06'),
            ('0.2 PLC', '0.2'),
            ('1 PLC (DEF)', '1'),
            ('2 PLC', '2'),
            ('10 PLC', '10'),
            ('100 PLC (MIN)', '100'),
        ],
        value=global_state.get('plc', '1'),
        description='PLC:',
    )
    plc.observe(lambda change: global_state.set('plc', plc.value))
    
    chunk_size =  widgets.Dropdown(
        options=[
            ('1000', '1000'),
            ('5000', '5000'),
            ('10000', '10000'),
            ('20000', '20000'),
            ('50000', '50000'),
        ],
        value=global_state.get('chunk_size', '5000'),
        description='Буфер:',
    )
    chunk_size.observe(lambda change: global_state.set('chunk_size', chunk_size.value))
    duration_sec =  widgets.Dropdown(
        options=[
            ('5s', '5'),
            ('10s', '10'),
            ('30s', '30'),
            ('60s', '60'),
            ('90s', '90s'),
            ('180s', '180s'),
            ('1h', '3600'),
            ('2h', '7200'),
            ('3h', '10800'),
            ('6h', '21600'),
            ('12h', '43200'),
            ('24h', '86400'),
        ],
        value=global_state.get('duration_sec', '30'), 
        description='Час запису:',
    )
    duration_sec.observe(lambda change: global_state.set('duration_sec', duration_sec.value))

    def show_measurement_range(mes_option= None):
        # store selected state
        if mes_option == 'VOLT:DC' or mes_option == 'VOLT:AC':
            mes_range =  widgets.Dropdown(
                options=[
                    ('AUTO', 'AUTO'),
                    ('100 mV', '0.1'),
                    ('1 V', '1'),
                    ('10 V', '10'),
                    ('100 V', '100'),
                    ('1000 V (MAX)', '1000'),
                ],
                value=global_state.get('mes:range:'+mes_option, '0.001'),
                description='Діапазон:',
            )
        elif mes_option == 'CURR:DC' or mes_option == 'CURR:AC':
            mes_range =  widgets.Dropdown(
                options=[
                    ('AUTO', 'AUTO'),
                    ('100 µA', '0.0001'),
                    ('1 mA', '0.001'),
                    ('10 mA', '0.01'),
                    ('100 mA', '0.1'),
                    ('1 A', '1'),
                    ('3 A (MAX)', '3'),
                ],
                value=global_state.get('mes:range:'+mes_option, '0.001'),
                description='Діапазон:',
            )
        
        mes_range.observe(lambda change: global_state.set('mes:range:'+mes_option, mes_range.value))
        
        def button_handler(btn):
            start_record_log(mes_options=mes_option, plc=float(plc.value), range=mes_range.value, duration_sec=int(duration_sec.value), chunk_size=int(chunk_size.value), ip_address=ip_address.value)
        
        button = widgets.Button(description="Start")
        button.on_click(button_handler)
        ui = widgets.HBox([mes_range, button])
        display(ui)
    
    out = widgets.interactive_output(show_measurement_range, {'mes_option': mes_options})
    ui = widgets.VBox([
        widgets.HBox([ip_address, duration_sec, chunk_size]),
        widgets.HBox([mes_options, plc]),
    ])
    display(ui, out)

log_recorder()

VBox(children=(HBox(children=(Text(value='192.168.1.202', description='IP'), Dropdown(description='Час запису:…

Output()

HBox(children=(IntProgress(value=0, bar_style='success', description='Запис:', max=5, style=ProgressStyle(bar_…

'Create CSV file: ./01.07.2025/JUPYTER/DMM00000.csv'

'Підключено: 192.168.1.202, Agilent Technologies,34410A,MY53006581,2.40-2.40-0.09-46-09\n'

'Waiting for reading. It may take a while...'

'config=VOLT:DC|plc=0.02|range=10|duration=5|chunk_size=10000|sample_rate=3219'

'Time is up, stopping...'

'Total lines written: 20000, duration: 6.88 sec, chunks: 2, sample rate: 3226.5 records/sec'