In [25]:
import socket

# Configuration
OSC_HOST = "mso58-erb010931"
OSC_PORT = 4000  # TekScope SCPI port
# SCPI_COMMAND = "*IDN?\n"  # Query identification
SCPI_COMMAND = ":CH1:SCALE 5e-02\n"  # Query identification

try:
    with socket.create_connection((OSC_HOST, OSC_PORT), timeout=2) as sock:
        print(f"Connected to {OSC_HOST}:{OSC_PORT}")
        sock.sendall(SCPI_COMMAND.encode())
        response = sock.recv(1024).decode().strip()
        print(f"Response: {response}")
except Exception as e:
    print(f"Failed to connect or communicate: {e}")


Connected to mso58-erb010931:4000
Failed to connect or communicate: timed out


In [12]:
import socket

OSC_HOST = "mso58-erb010931"
OSC_PORT = 4000
TIMEOUT = 2  # seconds

def send_scpi_command(cmd):
    """Envoie une commande SCPI (pas de réponse attendue)"""
    try:
        with socket.create_connection((OSC_HOST, OSC_PORT), timeout=TIMEOUT) as sock:
            sock.sendall((cmd + '\n').encode())
            print(f"Envoyé : {cmd}")
    except Exception as e:
        print(f"Erreur d'envoi : {e}")

def query_scpi(cmd):
    """Envoie une commande SCPI et lit la réponse"""
    try:
        with socket.create_connection((OSC_HOST, OSC_PORT), timeout=TIMEOUT) as sock:
            sock.sendall((cmd + '\n').encode())
            response = sock.recv(4096).decode().strip()
            # print(f"Commande : {cmd}\nRéponse  : {response}")
            return response
    except Exception as e:
        print(f"Erreur de requête : {e}")
        return None

cmd_id = "*IDN?\n"  
cmd2 = ":HOR:MAIN:SCALE?"
# send_scpi_command(cmd_id)
rep = query_scpi(cmd2)
print(rep)

20.0000E-3


In [None]:
import socket
import serial

# Configuration du port série
PORT = 'COM3'  # Remplace par ton port réel, exemple: /dev/ttyUSB0 sous Linux/Mac
BAUDRATE = 115200
scope = socket.create_connection(("mso58-erb010931", 4000), timeout=2)


def send(cmd):
    try:
        scope.sendall((cmd + "\n").encode())
        print(f"Envoyé : {cmd}")
    except Exception as e:
        print(f"[Erreur SEND] {e}")

def query(cmd):
    try:
        scope.sendall((cmd + "\n").encode())
        response = scope.recv(4096).decode().strip()
        print(f"{cmd} → {response}")
        return response
    except Exception as e:
        print(f"[Erreur QUERY] {e}")
        return None
    
# send(":CH")
# query(":HOR:MAIN:SCALE?")
# rep = query(":SELECT:CH1?")
# if(rep == '1'):
#     send(":SELECT:CH1 OFF")
# idn = query("*IDN?")
# print("Oscilloscope ID:", idn)
# send(":ACQuire:STATE RUN")

try:
    ser = serial.Serial(PORT, BAUDRATE, timeout=1)
    print(f"Écoute du port {PORT} à {BAUDRATE} bauds...\n")

    while True:
        if ser.in_waiting:
            line = ser.readline().decode(errors='ignore').strip()
            if line:
                print(f"Reçu : {line}")
                send(line)

except KeyboardInterrupt:
    print("\nArrêt manuel.")
except Exception as e:
    print(f"Erreur : {e}")
finally:
    if 'ser' in locals() and ser.is_open:
        ser.close()
        print("Port série fermé.")


Écoute du port COM3 à 115200 bauds...

Reçu : Button 1 clicked
Envoyé : Button 1 clicked
Reçu : :SELECT:CH8 ON
Envoyé : :SELECT:CH8 ON
Reçu : Button 0 clicked
Envoyé : Button 0 clicked
Reçu : :SELECT:CH8 OFF
Envoyé : :SELECT:CH8 OFF
Reçu : Encoder 1 moved to -1
Envoyé : Encoder 1 moved to -1
Reçu : Encoder 1 moved to -2
Envoyé : Encoder 1 moved to -2
Reçu : Encoder 1 moved to -3
Envoyé : Encoder 1 moved to -3
Reçu : Encoder 1 moved to -4
Envoyé : Encoder 1 moved to -4
Reçu : Encoder 1 moved to -2
Envoyé : Encoder 1 moved to -2
Reçu : Encoder 1 moved to -1
Envoyé : Encoder 1 moved to -1
Reçu : Encoder 1 moved to -2
Envoyé : Encoder 1 moved to -2
Reçu : Button 1 clicked
Envoyé : Button 1 clicked
Reçu : :SELECT:CH8 ON
Envoyé : :SELECT:CH8 ON
Reçu : Encoder 1 moved to -3
Envoyé : Encoder 1 moved to -3
Reçu : Button 0 clicked
Envoyé : Button 0 clicked
Reçu : :SELECT:CH8 OFF
Envoyé : :SELECT:CH8 OFF
Reçu : Button 1 clicked
Envoyé : Button 1 clicked
Reçu : :SELECT:CH8 ON
Envoyé : :SELECT:CH8 

In [None]:
import serial
import socket
import time

# --- Config ---
SERIAL_PORT = 'COM3'  # Adapte ton port série
BAUDRATE = 115200
OSC_HOST = 'mso58-erb010931'
OSC_PORT = 4000

# --- Listes de Scale disponibles ---
time_scale_list = [1e-9, 2e-9, 5e-9, 10e-9, 20e-9, 50e-9, 100e-9, 200e-9, 500e-9,
              1e-6, 2e-6, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6, 500e-6,
              1e-3, 2e-3, 5e-3, 10e-3, 20e-3, 50e-3, 100e-3, 200e-3, 500e-3]

volt_scale_list = [1e-3, 2e-3, 5e-3, 10e-3, 20e-3, 50e-3, 100e-3, 200e-3, 500e-3,
               1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000]


# --- Etat Interne ---
selected_channel = None
channel_states = [False] * 8
current_time_scale_index = 0  # Sera synchronisé au démarrage
current_volt_scale_index = 0  # Sera synchronisé au démarrage
position_step = 0.01  # Ajuste selon besoin

# --- Fonctions SCPI ---
def send_scpi(cmd):
    try:
        with socket.create_connection((OSC_HOST, OSC_PORT), timeout=2) as sock:
            sock.sendall((cmd + "\n").encode())
            print(f"SCPI Sent: {cmd}")
    except Exception as e:
        print(f"[SCPI Error] {e}")

def query_scpi(cmd):
    try:
        with socket.create_connection((OSC_HOST, OSC_PORT), timeout=2) as sock:
            sock.sendall((cmd + "\n").encode())
            response = sock.recv(4096).decode().strip()
            print(f"SCPI Query: {cmd} → {response}")
            return response
    except Exception as e:
        print(f"[SCPI Query Error] {e}")
        return None

# --- Synchro Scale au démarrage ---
def sync_scale():
    global current_time_scale_index
    response = query_scpi(":HORizontal:MAIN:SCALE?")
    try:
        val = float(response)
        if val in time_scale_list:
            current_scale_index = time_scale_list.index(val)
            print(f"Scale synchronisé : {val} s/div")
        else:
            print(f"Scale {val} inconnu, liste personnalisée à adapter.")
    except:
        print("Impossible de lire le scale depuis le scope.")

    global current_volt_scale_index
    response = query_scpi(f":CH1:SCALE?").strip()
    try:
        val = float(response)
        if val in volt_scale_list:
            current_volt_scale_index = volt_scale_list.index(val)
            print(f"Scale synchronisé : {val} volts/div")
        else:
            print(f"Scale {val} inconnu, liste personnalisée à adapter.")
    except:
        print("Impossible de lire le scale depuis le scope.")

# --- Gestion des événements ---
def handle_button(button_index):
    global selected_channel
    channel = button_index + 1
    if selected_channel != channel:
        selected_channel = channel
        state = query_scpi(f":SELECT:CH{channel}?")
        state = "ON" if state.strip() == "1" else "OFF"
        print(f"Canal CH{channel} sélectionné. État : {state}")
        if state == "OFF":
            state = "ON"
            send_scpi(f":SELECT:CH{channel} {state}")
    else:
        state = query_scpi(f":SELECT:CH{channel}?")
        state = "OFF" if state.strip() == "1" else "ON"
        send_scpi(f":SELECT:CH{channel} {state}")
        print(f"CH{channel} {state}")

def handle_encoder(encoder_index, delta):
    global current_time_scale_index
    global current_volt_scale_index

    if selected_channel is None:
        print("Aucun canal sélectionné.")
        return

    if encoder_index == 0:
        send_scpi(f":HORizontal:MAIN:POS {delta * position_step}")

    elif encoder_index == 1:
        new_index = current_time_scale_index + delta
        new_index = max(0, min(len(time_scale_list) - 1, new_index))
        if new_index != current_time_scale_index:
            current_time_scale_index = new_index
            new_scale = time_scale_list[current_time_scale_index]
            send_scpi(f":HORizontal:MAIN:SCALE {new_scale}")
            print(f"Scale Horizontal → {new_scale} s/div")

    elif encoder_index == 2:
        send_scpi(f":CH{selected_channel}:POS {delta * position_step}")

    elif encoder_index == 3:
        new_index = current_volt_scale_index + delta
        new_index = max(0, min(len(volt_scale_list) - 1, new_index))
        if new_index != current_volt_scale_index:
            current_volt_scale_index = new_index
            new_scale = time_scale_list[current_volt_scale_index]
            send_scpi(f":CH{selected_channel}:SCALE {new_scale}")
            print(f"Scale Horizontal → {new_scale} s/div")

# --- Boucle principale ---
try:
    ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1)
    print(f"Écoute sur {SERIAL_PORT} à {BAUDRATE} bauds...")

    sync_scale()  # Lire le scale initial à la connexion

    while True:
        line = ser.readline().decode(errors='ignore').strip()
        if not line:
            continue

        print(f"Reçu: {line}")

        if line.startswith("Button "):
            parts = line.split()
            if len(parts) >= 3 and parts[2] == "pressed":
                handle_button(int(parts[1]))

        elif line.startswith("Encoder "):
            parts = line.split()
            if len(parts) >= 3:
                index = int(parts[1])
                delta = int(parts[2])
                handle_encoder(index, delta)

except KeyboardInterrupt:
    print("\nArrêt manuel.")
except Exception as e:
    print(f"[Erreur] {e}")
finally:
    if 'ser' in locals() and ser.is_open:
        ser.close()
        print("Port série fermé.")


Écoute sur COM3 à 115200 bauds...
SCPI Query: :HORizontal:MAIN:SCALE? → 4.0000E-9
Scale 4e-09 inconnu, liste personnalisée à adapter.
SCPI Query: :CH1:SCALE? → 25.0000E-3
Scale 0.025 inconnu, liste personnalisée à adapter.
Reçu: Button 0 pressed
SCPI Query: :SELECT:CH1? → 1
Canal CH1 sélectionné. État : ON
Reçu: Encoder 1 +1
1
SCPI Sent: :HORizontal:MAIN:SCALE 2e-09
Scale Horizontal → 2e-09 s/div
Reçu: Encoder 1 -1
-1
SCPI Sent: :HORizontal:MAIN:SCALE 1e-09
Scale Horizontal → 1e-09 s/div
Reçu: Encoder 1 +1
1
SCPI Sent: :HORizontal:MAIN:SCALE 2e-09
Scale Horizontal → 2e-09 s/div
Reçu: Encoder 1 +1
1
SCPI Sent: :HORizontal:MAIN:SCALE 5e-09
Scale Horizontal → 5e-09 s/div

Arrêt manuel.
Port série fermé.
