In [32]:
import numpy as np
from time import sleep, time
from IPython.display import clear_output

import pylsl
import serial
from datetime import datetime

import os
import sys
import preprocessing as pp
from sklearn.decomposition import PCA
import pickle as pk
from catboost import CatBoostClassifier
from sklearn.preprocessing import StandardScaler
import keyboard
from pylsl import StreamInfo, StreamOutlet, local_clock

In [40]:
model = CatBoostClassifier()      # parameters not required.
model.load_model('train_stiffness/catboost_stiffness.cbm')
ss = pk.load(open("train_stiffness/scaler_stiffness.pkl",'rb'))

print("Script started")

num_sensors = 96
classes = {0:'Бутылка', 1:'Яблоко', 2:'Губка', 3:'Протез закрыт', 4:'Протез открыт'}
included_indices = [i for i in range(12,96) if i != 15]

try:
    # Serial setup
    ser = serial.Serial('COM3', 230400, timeout=1.0)
    print("Script started2")
    ser.flushInput()
except:
    pass

all_data = []


streams=pylsl.resolve_streams()
stream_flg=False
for i in streams:
    if i.name()=='Tapas':
        stream_flg=True
        print('Tapas exhists')
        break

if not stream_flg:
    info = StreamInfo('Tapas', 'float32')
    outlet = StreamOutlet(info)
    print('We created TAPAS')

Script started
Script started2
Tapas exhists


In [41]:
def read_serial_binary():
    """Read 96 binary values (2 bytes each) + checksum between < and > markers."""
    expected_bytes = num_sensors * 2 + 2  # 96 values × 2 bytes + 2 bytes for checksum
    buffer = b""
    start_time = time()
    timeout = 0.5 
    # Wait for start marker
    while True:
        
        if (time() - start_time) > timeout:
            print("Warning: Timeout waiting for start marker")
            return None
        char = ser.read(1)
        if char == b'<':
            break
        elif char == b'':
            continue
    
    # Read data and checksum
    data = ser.read(expected_bytes)
    buffer += data
    
    # Wait for end marker
    char = ser.read(1)
    if char != b'>':
        print("Warning: End marker not found")
        return None
    
    if len(data) != expected_bytes:
        print(f"Warning: Expected {expected_bytes} bytes, got {len(data)}")
        return None
    
    # Extract values and checksum
    values = np.frombuffer(data[:num_sensors * 2], dtype=np.uint16)  # 96 values
    received_checksum = np.frombuffer(data[num_sensors * 2:], dtype=np.uint16)[0]
    
    # Verify checksum
    computed_checksum = np.sum(values, dtype=np.uint16)
    if computed_checksum != received_checksum:
        print(f"Warning: Checksum mismatch - computed: {computed_checksum}, received: {received_checksum}")
        return None
    
    return values

def main_loop():
    name = 'SuperGlove' # Название источника
    sigtype = 'Predictions' # Название сигнала
    
    global all_data, obj, txt_file, iteration_count
    ser.flushInput()
    iteration_count = 0

    previous_prediction = 5 # Изначальное значение

    while True:
        ser.flushInput()
        count = 0
        sample_data = np.zeros((30, 96))
        while count < 30:
            ser.write(b'R')
            ser.flush()
            values = read_serial_binary()
            if values is None or len(values) != num_sensors:
                print(f"Warning: Expected {num_sensors} values, got {len(values) if values is not None else 0}")
                continue
            if all(value <= 1000 for value in values):
                numeric_arrays = np.where(values > 15, values, 0)
                sample_data[count] = numeric_arrays
                iteration_count += 1
                count += 1
            else:
                print("Warning: One or more sensor values exceeded the maximum allowed value of 1000.")
                continue

        thresh = 0.0023
        resistances = pp.transform_to_resistances(sample_data)
        smooth_resistances = pp.robust_smoothing(resistances)
        power = pp.compute_power(smooth_resistances)
        print(power)
        if power > thresh:
            scaled_data = ss.transform(smooth_resistances)
            data_for_model = pp.extract_features(scaled_data[:, included_indices])

            current_prediction = model.predict(data_for_model)
            clear_output()
            print(current_prediction[0])
            #print(type([current_prediction[0]]))
            if previous_prediction != current_prediction:
                outlet.push_sample([current_prediction[0]])
                #print(current_prediction[0])
            previous_prediction = current_prediction
        else:
            current_prediction = 5
            if previous_prediction != current_prediction:
                outlet.push_sample([current_prediction])
                #print(current_prediction)
                #print(type([current_prediction]))
            previous_prediction = current_prediction
            clear_output()
            
            print("Пусто")
            continue



In [42]:
print("Start")
main_loop() 
print(f"Final count: {iteration_count}")
if ser.is_open:
    ser.close()
    print("Serial port closed successfully after data integrity check.")
    clear_output()

Пусто


SerialException: GetOverlappedResult failed (PermissionError(13, 'Отказано в доступе.', None, 5))