In [1]:
pip install pymodbus

Note: you may need to restart the kernel to use updated packages.


### Эту команду прописать чтобы был доступ к порту USB

In [16]:
sudo chmod 666 /dev/ttyUSB0

SyntaxError: invalid syntax (1467591312.py, line 1)

In [1]:
import serial
from pymodbus.pdu import ModbusRequest
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.constants import Defaults

from time import sleep
from typing import List
import struct
import json


# Константы повторного опроса при пропаже соединенния или пакета
Defaults.RetryOnEmpty = True
Defaults.Timeout = 5
Defaults.Retries = 5

In [2]:
client = ModbusClient(method = 'rtu', 
                    port='/dev/ttyUSB0', 
                    baudrate=115200,
                    timeout=1, 
                    bytesize=8, 
                    stopbits=1)

vacuum_system_slave_id = 1

In [3]:
# кодирование и декодирование float -> [uint16, uint16]
def encode_float(value: float) -> List:
	float_in_bytes = bytearray(struct.pack("f", value))
	values = [] 
	for idx in range(0, len(float_in_bytes), 2): 
		values.append((float_in_bytes[idx]<<8) + float_in_bytes[idx + 1]) 
	
	return values 

def decode_to_float(data: List) -> float:
	barray = bytearray([]) 
	for value in data: 
		barray += bytearray([value>>8, value&0xFF]) 
	
	if isinstance(barray, list): 
		float_raw = struct.unpack("f", bytearray(barray))[0] 
	else:
		float_raw =  struct.unpack("f", barray)[0]

	return round(float_raw, 4)


def mb_connect():
    # првоеряем подключение, пытаемся подключиться 10 раз
    for i in range(10):
        connection = client.connect()
        if(not connection):
            print("Modbus not connected... try to connect again")
            sleep(0.1)
        else:
            print("Modbus connected succesfully")
            return connection

    return False

class VacuumModbusManager(object):
    def mb_read_state(self):
        connection = mb_connect()
        if not connection:
            return False, None

        state = client.read_holding_registers(0, 30, unit=vacuum_system_slave_id) # start_address, count, slave_id
        
        if state.isError():
            print('Unable to read state or there is the connection problem.')
            return False, None

        return True, state.registers


    def mb_read_pressure_state(self):
        connection = mb_connect()
        if not connection:
            return False, None

        state = client.read_holding_registers(22, 8, unit=vacuum_system_slave_id) # start_address, count, slave_id
        
        if state.isError():
            print('Unable to read state or there is the connection problem.')
            return False, None

        # return True, state.registers

        # convert raw bytes to float
        data = []
        data.append(decode_to_float(state.registers[0:2]))
        data.append(decode_to_float(state.registers[2:4]))
        data.append(decode_to_float(state.registers[4:6]))
        data.append(decode_to_float(state.registers[6:8]))

        return True, data


    def mb_write_default_params(self):
        connection = mb_connect()
        if not connection:
            return False
        
        default_parameters = [600, 200, 850, 1000, 20000, 50]

        # записываем регистры и проверяем ответ
        # TODO: сделать несколько попыток
        response = client.write_registers(4, default_parameters, unit=vacuum_system_slave_id)
        if response.isError():
            print('Unable to write default params or there is the connection problem.')
            return False
        return True


    def mb_write_single_register(self, adress, data):
        connection = mb_connect()
        if not connection:
            return False
    
        response = client.write_register(adress, int(data), unit=vacuum_system_slave_id)  
        if response.isError():
            print('Unable to write command or there is the connection problem.')
            return False

        return True

# 
vacuum_mb = VacuumModbusManager()

## Conection

In [4]:
mb_connect()

Modbus connected succesfully


True

## Все регистры (состояние системы)

In [5]:
is_connect, state = vacuum_mb.mb_read_state()
result = {}
for i in range(30):
    result[i] = state[i]


result = json.dumps(result, indent=4, sort_keys=True)
print(result)

Modbus connected succesfully
{
    "0": 1,
    "1": 0,
    "2": 1,
    "3": 0,
    "4": 600,
    "5": 200,
    "6": 600,
    "7": 1000,
    "8": 20000,
    "9": 50,
    "10": 10,
    "11": 0,
    "12": 0,
    "13": 1,
    "14": 0,
    "15": 0,
    "16": 0,
    "17": 0,
    "18": 0,
    "19": 0,
    "20": 0,
    "21": 0,
    "22": 44603,
    "23": 2116,
    "24": 40954,
    "25": 26436,
    "26": 38074,
    "27": 7236,
    "28": 2186,
    "29": 32068
}


## Показания датчиков давления

In [10]:
is_connect, state = vacuum_mb.mb_read_pressure_state()
result = {}
for i in range(4):
    result[i] = state[i]


result = json.dumps(result, indent=4, sort_keys=True)
print(result)

Modbus connected succesfully
{
    "0": 547.6501,
    "1": 927.8745,
    "2": 629.4133,
    "3": 1014.1259
}


## Включить ресивер

In [128]:
is_mb_connect = vacuum_mb.mb_write_single_register(0, 1)
is_mb_connect

Modbus connected succesfully


True

## Выключить ресивер

In [110]:
is_mb_connect = vacuum_mb.mb_write_single_register(0, 0)
is_mb_connect

Modbus connected succesfully


True

## Включить 1 стол

In [90]:
is_mb_connect = vacuum_mb.mb_write_single_register(1, 1)
is_mb_connect

Modbus connected succesfully


True

## Выключить 1 стол

In [109]:
is_mb_connect = vacuum_mb.mb_write_single_register(1, 0)
is_mb_connect

Modbus connected succesfully


True

## Включить 2 стол

In [124]:
is_mb_connect = vacuum_mb.mb_write_single_register(2, 1)
is_mb_connect

Modbus connected succesfully


True

## Выключить 2 стол

In [11]:
is_mb_connect = vacuum_mb.mb_write_single_register(2, 0)
is_mb_connect

Modbus connected succesfully


True

## Включить 3 стол

In [125]:
is_mb_connect = vacuum_mb.mb_write_single_register(3, 1)
is_mb_connect

Modbus connected succesfully


True

## Выключить 3 стол

In [108]:
is_mb_connect = vacuum_mb.mb_write_single_register(3, 0)
is_mb_connect

Modbus connected succesfully


True