forked from skagmo/esmart_mppt
-
Notifications
You must be signed in to change notification settings - Fork 1
/
esmart.py
executable file
·129 lines (106 loc) · 3.64 KB
/
esmart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Library for communicating with eSmart 3 MPPT charger
# skagmo.com, 2018
# modified for data export
import time, serial #, struct, socket, requests
# States
STATE_START = 0
STATE_DATA = 1
REQUEST_MSG0 = b"\xaa\x01\x01\x01\x00\x03\x00\x00\x1e\x32"
LOAD_OFF = b"\xaa\x01\x01\x02\x04\x04\x01\x00\xfe\x13\x38"
LOAD_ON = b"\xaa\x01\x01\x02\x04\x04\x01\x00\xfd\x13\x39"
DEVICE_MODE = ["IDLE", "CC", "CV", "FLOAT", "START"]
class esmart:
def __init__(self):
self.state = STATE_START
self.data = []
self.fields = {}
self.callback = False
self.port = ""
self.timeout = 0
self.name = ""
self.ext_temp_name = "bat"
def __del__(self):
self.close()
def set_callback(self, function):
self.callback = function
def set_name(self, name):
self.name = name
def set_ext_temp_name(self, name):
self.ext_temp_name = name
def open(self, port):
self.ser = serial.Serial(port,9600,timeout=0.1)
self.port = port
def close(self):
try:
self.ser.close()
self.ser = False
except AttributeError:
pass
def send(self, pl):
self.ser.write(self.pack(pl))
def parse(self, data):
for c in data:
if (self.state == STATE_START):
if (c == 0xaa):
# Start character detected
self.state = STATE_DATA
self.data = []
self.target_len = 255
#else:
#print c
elif (self.state == STATE_DATA):
self.data.append(c)
# Received enough of the packet to determine length
if (len(self.data) == 5):
self.target_len = 6 + self.data[4]
# Received whole packet
if (len(self.data) == self.target_len):
self.state = STATE_START
#print " ".join("{:02x}".format(ord(c)) for c in self.data)
# Source 3 is MPPT device
if (self.data[2] == 3):
msg_type = self.data[3]
# Type 0 packet contains most data
if (self.data[3] == 0):
self.fields = {}
self.fields['chg_mode'] = int.from_bytes(self.data[7:9], byteorder='little')
self.fields['pv_volt'] = int.from_bytes(self.data[9:11], byteorder='little') / 10.0
self.fields['bat_volt'] = int.from_bytes(self.data[11:13], byteorder='little') / 10.0
self.fields['chg_cur'] = int.from_bytes(self.data[13:15], byteorder='little') / 10.0
self.fields['load_volt'] = int.from_bytes(self.data[17:19], byteorder='little') / 10.0
self.fields['load_cur'] = int.from_bytes(self.data[19:21], byteorder='little') / 10.0
self.fields['chg_power'] = int.from_bytes(self.data[21:23], byteorder='little')
self.fields['load_power']= int.from_bytes(self.data[23:25], byteorder='little')
self.fields['ext_temp'] = self.data[25] if self.data[25] < 200 else self.data[25] - 256
self.fields['int_temp'] = self.data[27] if self.data[27] < 200 else self.data[27] - 256
self.fields['soc'] = self.data[29]
self.fields['co2_gram'] = int.from_bytes(self.data[33:35], byteorder='little')
self.fields['name'] = self.name
self.fields['ext_temp_name'] = self.ext_temp_name
self.callback(self.fields)
def tick(self):
try:
while (self.ser.inWaiting()):
self.parse(self.ser.read(100))
# Send poll packet to request data every x seconds
if (time.time() - self.timeout) > 1:
self.ser.write(REQUEST_MSG0)
self.timeout = time.time()
#time.sleep(0.5)
#self.ser.write(LOAD_OFF)
except IOError:
print("Serial port error, fixing")
self.ser.close()
opened = 0
while not opened:
try:
self.ser = serial.Serial(self.port,38400,timeout=0)
#time.sleep(0.1)
if self.ser.read(100):
opened = 1
else:
self.ser.close()
except serial.serialutil.SerialException:
time.sleep(0.5)
self.ser.close()
print("Error fixed")