# Init

In [63]:
# IMPORT ALL LIB 
import subprocess
import numpy as np  
import matplotlib.pyplot as plt 

#%pip install pyvisa
#%pip install pyvisa-py
import pyvisa  
#print("pyvisa==",pyvisa.__version__)

In [7]:
scope_ip = "169.254.104.98" 

In [65]:


def ping_oscilloscope(ip="169.254.229.238", count=1):
    try:
        result = subprocess.run(
            ["ping", "-n", str(count), ip],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,  # decode output as text
            shell=True
        )
        if result.returncode == 0:
            print("Ping successful:")
        else:
            print("Ping failed:")
        print(result.stdout)
    except Exception as e:
        print("Error pinging oscilloscope:", e)

#ping_oscilloscope(ip = scope_ip )

In [80]:
rm = pyvisa.ResourceManager('@py') # Use pyvisa-py backend  

 
resource_str = f"TCPIP::{scope_ip}::INSTR"  
try:  
    scope.close()  
except: pass  

Try_N_time= 5
for i in range( Try_N_time): 
    try:  
        scope = rm.open_resource(resource_str)  
        print(i, "Connected to:", scope.query("*IDN?"))  
        break 
    except Exception as e:  
        print("Connection failed:", e)  

0 Connected to: TEKTRONIX,TDS 3034B,0,CF:91.1CT FV:v3.35 TDS3FFT:v1.00 TDS3TRG:v1.00



In [88]:
# Safe: ASCII SCPI dump
def save_setup(title):
    setup_ascii = scope.query("*LRN?")
    with open(f"{title}_setup.txt", "w", encoding="utf-8") as f:
        f.write(setup_ascii)
#print("Saved ASCII SCPI to scope_setup_lrn.txt")

In [91]:
def load_setup(title):
    filename = f"{title}_setup.txt"
    with open(filename, "r", encoding="utf-8") as f:
        setup_text = f.read()

    # Split on semicolons ; Tektronix uses ";" to chain commands
    commands = setup_text.replace("\n", "").split(";")

    for cmd in commands:
        cmd = cmd.strip()
        if not cmd:
            continue  # skip empty
        try:
            scope.write(cmd)
        except Exception as e:
            print(f"Failed: {cmd} -> {e}")
# Reload setup from file
#load_setup(scope, "scope_setup_lrn.txt")
#print("Setup restored to oscilloscope.")


In [89]:
# save_setup(title)
# DONE 

In [97]:
#save_setup(title)

In [93]:
# load_setup(title)
# DONE 

In [68]:
def det_data(chanel = "CH1"): 
    scope.write("DATA:SOURCE "+chanel)  
    # Set binary format  
    scope.write("DATA:ENC RIB") # Signed binary  
    scope.write("DATA:WIDTH 2") # 2 byte per point  
    
    # Read waveform settings for scaling  
    x_increment = float(scope.query("WFMPRE:XINCR?"))  
    x_origin = float(scope.query("WFMPRE:XZERO?"))  
    y_increment = float(scope.query("WFMPRE:YMULT?"))  
    y_origin = float(scope.query("WFMPRE:YZERO?"))  
    y_offset = float(scope.query("WFMPRE:YOFF?"))  
    
    
    # set your desired record length  
    scope.write("DATA:START 1")  
    scope.write("DATA:STOP 10000")  
    
    # this issues the CURVE? query and returns a numpy int16 array  
    raw = scope.query_binary_values(  
    'CURVE?',  
    datatype='h', # 'h' = signed 16â€‘bit  
    is_big_endian=True, # or False depending on your scope  
    container=np.array  
    )  
    
    
    # Convert bytes to numpy array  
    wave = np.array( raw)  
    
    # Scale data  
    voltages = (wave - y_offset) * y_increment + y_origin  
    
    times = np.arange(len(voltages)) * x_increment + x_origin 
    return times, voltages
#det_data(chanel = "CH1")

In [69]:
def get_active_channels():
    active_channels = []
    for i in range(1, 5):  # assuming up to CH4
        try:
            response = scope.query(f"SELECT:CH{i}?").strip()
            if response in ['1', 'ON']:  # depending on oscilloscope
                active_channels.append(f"CH{i}")
        except Exception as e:
            print(f"Error querying CH{i}: {e}")
    return active_channels
#chanels = get_active_channels()
#chanels

In [70]:
chanels

['CH1']

In [71]:
#chanels = ['CH1', 'CH2']
def get_probe_config(chanels): 
    #params = ["PROBe", "IMPEDANCE", "COUPLING"]
    params = ["PROBe", "IMPEDANCE", "COUPLING", 
              "OFFSET", "POSITION", "BANDWIDTH", "INVERT", "SCALE"]
    DIC = {}
    for c in chanels: 
        dic ={}
        cu= c.upper()
        for p in params: 
            try:
                dic[p]= scope.query(cu+":"+p+"?").replace("\n", "")
            except: pass
        DIC[c]= dic
    params = ["HORIZONTAL:MAIN:SCALE?", "WFMPRE:NR_Pt?"]
    dic={}
    for p in params: 
        try: 
            dic[p]= scope.query(p).replace("\n", "")
        except: pass
    DIC["general"]= dic


    params = [
        "TRIGger:STATE?",
        "TRIGger:MAIn:EDGE:SOURce?",
        "TRIGger:MAIn:EDGE:SLOPe?",
        "TRIGger:MAIn:EDGE:COUPling?",
        "TRIGger:MAIn:LEVel?"
    ]
    dic={}
    for p in params: 
        try: 
            dic[p]= scope.query(p).replace("\n", "")
        except: pass
    DIC["trigger"]= dic
    
    return DIC
#get_probe_config(chanels)

In [17]:
def get_meas():
    arr = {}
    for i in range(1,5):  
        data, status = scope.query(f"MEASurement:MEAS{i}:DATA?").split(',')  
        mtype = scope.query(f"MEASurement:MEAS{i}:TYPE?").strip()
        source = scope.query(f"MEASurement:MEAS{i}:SOURCE1?").strip()
        if int(status) ==0:  
            #print("#", source,mtype, data)  
            try:
                arr[source][mtype]=data
            except:
                arr[source] = {mtype:data}

    return arr
#get_meas()              

In [18]:
import json


def get_params_txt(title=None):
    chanels = get_active_channels()
    dic  =get_probe_config(chanels)
    dic["measurements"]= get_meas() 
    if title is not None: 
        # Save to JSON file
        with open(title+".json", "w") as f:
            json.dump(dic, f, indent=4)
    return dic 
#_= get_params_txt("test")
#_

In [54]:
import pandas as pd 
def get_signals(title):
    chanels = get_active_channels()
    refs = get_active_refs()

    DIC = {}
    for c in chanels: 
        dic ={}
        cu= c.upper()
        time, voltage = det_data(chanel = cu)
        DIC["time"]= time
        DIC[cu]= voltage
    for c in refs :
        cu= c.upper()
        time, voltage = det_data(chanel = cu)
        DIC["time"]= time
        DIC[cu]= voltage
    
    df = pd.DataFrame(DIC)
    df.to_csv(title+".csv", index = False) 
    return df


In [9]:
import requests
def get_image(title):
    url = url = f"http://{scope_ip}/Image.png"
    
    # Download the image as binary you can add timout=10
    response = requests.get(url, stream=True)
    response.raise_for_status()  # check for HTTP errors
    with open(f"{title}.png", "wb") as f:
        f.write(response.content)
requests.__version__
# '2.32.5'

'2.32.5'

In [11]:
import urllib3, certifi, sys
print("urllib3:", urllib3.__version__)
print("certifi:", certifi.__version__)
print("Python:", sys.version)


urllib3: 1.26.20
certifi: 2025.07.14
Python: 3.13.5 (tags/v3.13.5:6cb20a2, Jun 11 2025, 16:15:46) [MSC v.1943 64 bit (AMD64)]


In [45]:
def get_active_refs():
    active_refs = []
    for i in range(1, 5):  # REF1..REF4
        try:
            response = scope.query(f"SELECT:REF{i}?").strip()
            if response in ['1', 'ON']:
                active_refs.append(f"REF{i}")
        except Exception as e:
            print(f"Error querying REF{i}: {e}")
    return active_refs


# MEAS

In [107]:
title = "meas12.4khz"

In [108]:
%%time 
get_image(title)

CPU times: total: 0 ns
Wall time: 1.94 s


In [109]:
%%time 
df = get_signals(title)

CPU times: total: 15.6 ms
Wall time: 59.1 ms


In [110]:
%%time 
dic= get_params_txt(title)

CPU times: total: 15.6 ms
Wall time: 190 ms


In [111]:
%%time
save_setup(title)

CPU times: total: 0 ns
Wall time: 349 ms


In [114]:
import psutil
import platform

def list_disks():
    disks = []
    for part in psutil.disk_partitions(all=False):
        info = {
            "device": part.device,
            "mountpoint": part.mountpoint,
            "fstype": part.fstype,
            "opts": part.opts
        }

        # detect removable / USB
        removable = False

        if platform.system() == "Windows":
            # on Windows -> removable drives often have 'removable' in opts
            if "removable" in part.opts.lower():
                removable = True
        else:
            # on Linux -> many USB sticks are mounted under /media or /run/media
            if part.mountpoint.startswith("/media") or part.mountpoint.startswith("/run/media"):
                removable = True

        info["removable"] = removable
        disks.append(info)
    return disks


if __name__ == "__main__":
    for d in list_disks():
        tag = " (USB flash)" if d["removable"] else ""
        print(f"{d['device']} mounted at {d['mountpoint']} [{d['fstype']}] {tag}")
# you must use d['mountpoint'] to write your file 

C:\ mounted at C:\ [NTFS] 


In [115]:
d['mountpoint']

'C:\\'