In [None]:
pip install pymodbus

In [None]:
import serial
from pymodbus.pdu import ModbusRequest
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.constants import Defaults

from time import sleep
import json

# Константы повторного опроса при пропаже соединенния или пакета
Defaults.RetryOnEmpty = True
Defaults.Timeout = 5
Defaults.Retries = 5

# клиент соединеня по UART
client = ModbusClient(method = 'rtu', 
                    port='/dev/ttyUSB0', 
                    baudrate=115200,
                    timeout=1, 
                    bytesize=8, 
                    stopbits=1)

# modbus protocol ID
doors_and_film_system_slave_id = 1

# Utils

### Проврека подключения

In [None]:
def mb_connect():
	# првоеряем подключение, пытаемся подключиться 10 раз
	for i in range(10):
		connection = client.connect()
		if(not connection):
			print("Modbus not connected... try to connect again")
			sleep(0.1)
		else:
			# print("Modbus connected succesfully")
			return connection

	return False

In [None]:
mb_connect()

# ДВЕРИ

In [None]:
class DoorModbusManager(object):
	def mb_read_state(self):
		connection = mb_connect()
		if not connection:
			print('Unable to connect door and film module via Modbus and RS-485.')
			return False, None

		state = client.read_holding_registers(0, 13, unit=doors_and_film_system_slave_id) # start_address, count, slave_id
		
		if state.isError():
			print('Unable to read or write Modbus registers. Error:\n %s', state)
			return False, None

		return True, state.registers


	def mb_write_default_params(self):
		connection = mb_connect()
		if not connection:
			print('Unable to connect door and film module via Modbus and RS-485.')
			return False
		
		extra_step_default_data = [12, 6]
		step_res_default_data = [1, 1]
		command_degault_data = 0

		# записываем регистры и проверяем ответ
		# TODO: сделать несколько попыток
		response1 = client.write_registers(3, extra_step_default_data, unit=doors_and_film_system_slave_id)
		if response1.isError():
			print('Unable to write params or there is the connection problem.')
			return False

		response2 = client.write_registers(3, step_res_default_data, unit=doors_and_film_system_slave_id)
		if response2.isError():
			print('Unable to write params or there is the connection problem.')
			return False
			
		response3 = client.write_registers(3, command_degault_data, unit=doors_and_film_system_slave_id)
		if response3.isError():
			print('Unable to write params or there is the connection problem.')
			return False

		return True


	def mb_write_command(self, command):
		connection = mb_connect()
		if not connection:
			print('Unable to connect door and film module via Modbus and RS-485.')
			return False
	
		response = client.write_register(0, int(command), unit=doors_and_film_system_slave_id)  
		if response.isError():
			print('Unable to read or write Modbus registers. Error:\n %s', response)
			return False

		return True


	def mb_write_speed(self, motor, speed):
		connection = mb_connect()
		if not connection:
			print('Unable to connect door and film module via Modbus and RS-485.')
			return False
		
		if(speed < 100 and speed > 50):
			resolution = 1
		elif(speed <= 50 and speed > 25):
			resolution = 2
		elif(speed <= 25 and speed > 12):
			resolution = 4
		elif(speed <= 12 and speed > 6):
			resolution = 8
		elif(speed <= 6 and speed > 3):
			resolution = 16
		else:
			resolution = 32


		if (motor == 1):
			response = client.write_register(9, int(resolution), unit=doors_and_film_system_slave_id)  
			if response.isError():
				print('Unable to read or write Modbus registers. Error:\n %s', response)
				return False
		else:
			response = client.write_register(10, int(resolution), unit=doors_and_film_system_slave_id)  
			if response.isError():
				print('Unable to read or write Modbus registers. Error:\n %s', response)
				return False

		return True

doors_mb = DoorModbusManager()

### Сброс ошибки

In [None]:
client.write_register(11, 0, unit=doors_and_film_system_slave_id)

### Настройка скорости работы дверей

In [None]:
client.write_register(12, 1000 , unit=doors_and_film_system_slave_id)

### Состояние для REST API 
### TODO: выводит неправильные данные при откртии и закрытии дверей

In [None]:
def state_high_level():
    is_mb_connect, data = doors_mb.mb_read_state()
    
    if not is_mb_connect:
        print("Not connected")
        error = "error"
        state_upper = "error"
        state_lower = "error"
        return error, state_upper, state_lower 

    error = "not error"
    state_upper = ""
    state_lower = ""

    if(not data[11] == 0):
        error = "error"
        state_upper = "error"
        state_lower = "error"
    elif(data[0] == 0 and data[11] == 0):
        state_upper = "door is unhold"
        state_lower = "door is unhold"
    elif(data[0] == 1 and data[11] == 0):
        if(data[6] == data[3] and data[8] == data[4]):
            state_upper = "door open without errors"
            state_lower = "door open without errors"
        else:
            state_upper = "door is opening"
            state_lower = "door is opening"
    elif(data[0] == 2 and data[11] == 0):
        if(data[5] == data[1] and data[7] == data[2]):
            state_upper = "door closed without errors"
            state_lower = "door closed without errors"
        else:
            state_upper = "door is closing"
            state_lower = "door is closing"
    else:
        print("some logic error occured")
        state_upper = "error"
        state_lower = "error"
    
    return error, state_upper, state_lower

# Состояние системы

In [None]:
is_connect, state = doors_mb.mb_read_state()

if is_connect:
    result = json.dumps(state, indent=4, sort_keys=True)
    print(result)

In [None]:
state_high_level()

# Открыть дверь

In [None]:
result = doors_mb.mb_write_command(2)

# Закрыть дверь

In [None]:
result = doors_mb.mb_write_command(1)

# Отпустить дверь

In [None]:
result = doors_mb.mb_write_command(0)

# Открыть и закрыть дверь N раз

In [None]:
def open_test():
    # print("TEST")
    error, upper_state, lower_state = state_high_level()
    if error == "error":
        print("some error")
        return
        # TODO: print error num
    
    # open door and wait
    result = doors_mb.mb_write_command(2)
    print("OPENING...")
    while upper_state != 'door closed without errors' and lower_state != 'door closed without errors':
        error, upper_state, lower_state = state_high_level()
        sleep(0.5)

    print("OPEN")
    return 

def close_test():
    error, upper_state, lower_state = state_high_level()
    if error == "error":
        print("some error")
        return
        # TODO: print error num
    
    # close door and wait
    result = doors_mb.mb_write_command(1)
    print("Closing...")
    while upper_state != "door open without errors" and lower_state != "door open without errors":
        error, upper_state, lower_state = state_high_level()
        sleep(0.5)
        return

In [None]:
num = 5
for i in range(num):
    print("========= TEST " + str(i) + " =========")
    open_test()
    sleep(3)
    close_test()
    sleep(3)

# Протяжка пленки

In [None]:
state = client.read_holding_registers(100, 11, unit=doors_and_film_system_slave_id) # start_address, count, slave_id
state.registers

### erase error

In [None]:
client.write_register(100, 0 , unit=doors_and_film_system_slave_id)
client.write_register(101, 0 , unit=doors_and_film_system_slave_id)

### film broach

In [None]:
client.write_register(102, 200 , unit=doors_and_film_system_slave_id)