forked from swift-nav/piksi_firmware
/
flash.py
206 lines (185 loc) · 7.54 KB
/
flash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#import wx
import struct
import time
from intelhex import IntelHex
from threading import Thread, Event
import sys
from math import ceil
PAGESIZE = 256
SECTORSIZE = 64*1024
FLASHSIZE = 1024*1024
#max number of flash operations to have pending in STM
PENDING_COMMANDS_LIMIT = 5
def roundup_multiple(x, multiple):
return x if x % multiple == 0 else x + multiple - x % multiple
def rounddown_multiple(x, multiple):
return x if x % multiple == 0 else x - x % multiple
class Flash(Thread):
_flash_ready = Event()
_command_queue = []
_wants_to_exit = False
_commands_sent = 0
_done_callbacks_received = 0
_read_callbacks_received = 0
rd_cb_addrs = []
rd_cb_lens = []
rd_cb_data = []
last_data = []
def __init__(self, link):
super(Flash, self).__init__()
self._flash_ready.set()
self.link = link
self.link.add_callback(0xF0, self._flash_done_callback)
self.link.add_callback(0xF1, self._flash_read_callback)
def flash_operations_left(self):
return len(self._command_queue) + self._commands_sent - self._done_callbacks_received - self._read_callbacks_received
def _flash_done_callback(self, data):
self._flash_ready.set()
self._done_callbacks_received += 1
def _flash_read_callback(self, data):
#3 bytes addr, 1 byte length, length bytes data
#append \x00 to the left side as it is a 3-byte big endian unsigned int and
#we unpack as a 4-byte big endian unsigned int
self._read_callbacks_received += 1
addr = None
length = None
# try:
addr = struct.unpack('>I','\x00' + data[0:3])[0]
self.rd_cb_addrs.append(addr)
length = struct.unpack('B',data[3])[0]
self.rd_cb_lens.append(length)
self.rd_cb_data += list(struct.unpack(str(self.rd_cb_lens[-1]) + 'B',data[4:]))
## self.rd_cb_data += list(struct.unpack('20B',data[4:]))
# if length != 16:
# print "read_callbacks_received = ", self._read_callbacks_received
# print ' addr =', addr
# print ' length =', length
# print ' len of data =', len(data)
# print ' data =', [ord(i) for i in data]
# print ' last_data =', [ord(i) for i in self.last_data]
# print ' hex(addr) =', hex(addr)
# print ' flash operations left =', self.flash_operations_left()
# sys.exit()
# except Exception:
# print "read_callbacks_received = ", self._read_callbacks_received
# print ' addr =', addr
# print ' length =', length
# print ' len of data =', len(data)
# print ' data =', [ord(i) for i in data]
# print ' last_data =', [ord(i) for i in self.last_data]
# print ' hex(addr) =', hex(addr)
# sys.exit()
# if self._read_callbacks_received % 0x1000 == 0:
# print "read_callbacks_received = ", self._read_callbacks_received
# print ' addr =', addr
# print ' length =', length
# print ' len of data =', len(data)
# print ' data =', [ord(i) for i in data]
# print ' last_data =', [ord(i) for i in self.last_data]
# print ' hex(addr) =', hex(addr)
# self.last_data = data
def _acquire_flash(self):
self._flash_ready.wait()
self._flash_ready.clear()
def _schedule_command(self, cmd, addr):
self._command_queue.append((cmd, addr))
def stop(self):
self._wants_to_exit = True
def run(self):
while not self._wants_to_exit:
while self.flash_operations_left() < 0:
raise Exception('self.flash_operations_left() returns less than 0')
if (len(self._command_queue) > 0) and ((self._commands_sent - self._done_callbacks_received - self._read_callbacks_received) < PENDING_COMMANDS_LIMIT):
cmd, args = self._command_queue[0]
self._command_queue = self._command_queue[1:]
cmd_func = getattr(self, cmd)
if cmd_func:
cmd_func(*args)
else:
time.sleep(0.001)
#Check that we received continuous addresses from the
#beginning of the flash read to the end, and that this
#matches the length of the received data from those addrs
def read_cb_sanity_check(self):
expected_addrs = [self.rd_cb_addrs[0]]
for length in self.rd_cb_lens[0:-1]:
expected_addrs.append(expected_addrs[-1] + length)
if self.rd_cb_addrs != expected_addrs:
raise Exception('Addresses returned in read callback appear discontinuous')
if sum(self.rd_cb_lens) != len(self.rd_cb_data):
raise Exception('Length of read data does not match read callback lengths')
def read(self, addr, length):
self._schedule_command('_read', (addr, length))
def _read(self, addr, length):
msg_buf = struct.pack("<II", addr, length)
self._commands_sent += int(ceil(float(length)/16))
self.link.send_message(0xF1, msg_buf)
def erase_sector(self, length):
self._schedule_command('_erase_sector', (length,))
def _erase_sector(self, addr):
# self._acquire_flash()
msg_buf = struct.pack("<I", addr)
self._commands_sent += 1
self.link.send_message(0xF2, msg_buf)
def write(self, addr, data):
self._schedule_command('_write', (addr,data))
def _write(self, addr, data):
MAXSIZE = 128
while len(data) > MAXSIZE:
data_to_send = data[:MAXSIZE]
# self._acquire_flash()
msg_header = struct.pack("<IB", addr, len(data_to_send))
self._commands_sent += 1
self.link.send_message(0xF0, msg_header+data_to_send)
addr += MAXSIZE
data = data[MAXSIZE:]
# self._acquire_flash()
msg_header = struct.pack("<IB", addr, len(data))
self._commands_sent += 1
self.link.send_message(0xF0, msg_header+data)
def write_ihx(self, filename):
ihx = IntelHex(filename)
t1 = time.time()
min_sector = rounddown_multiple(ihx.minaddr(), SECTORSIZE)
max_sector = roundup_multiple(ihx.maxaddr(), SECTORSIZE)
#Write hex file to flash
print "Writing hex to flash..."
for addr in range(min_sector, max_sector, SECTORSIZE):
self.erase_sector(addr)
min_page = rounddown_multiple(ihx.minaddr(), 128)
max_page = roundup_multiple(ihx.maxaddr(), 128)
for addr in range(min_page, max_page, 128):
self.write(addr, ihx.tobinstr(start=addr, size=128))
ops_left = self.flash_operations_left()
while ops_left != 0:
print "\r ",
print "\rFlash operations left =", ops_left,
sys.stdout.flush()
time.sleep(0.2)
ops_left = self.flash_operations_left()
print "\r ",
print "\rFlash operations left =", ops_left
t2 = time.time()
print "Finished writing hex to flash, took %.2f seconds" % (t2 - t1)
#Read bytes back from flash
print "Reading hex back from flash..."
self.read(0,ihx.maxaddr())
ops_left = self.flash_operations_left()
while ops_left != 0:
print "\r ",
print "\rFlash operations left =", ops_left,
sys.stdout.flush()
time.sleep(0.2)
ops_left = self.flash_operations_left()
print "\r ",
print "\rFlash operations left =", ops_left
t3 = time.time()
print "Finished reading hex from flash, took %.2f seconds" % (t3 - t2)
#Check that bytes read back from flash match hex file
print "Verifying that bytes read back from flash match hex file..."
self.read_cb_sanity_check()
if self.rd_cb_data != list(ihx.tobinarray(start=ihx.minaddr(), size=ihx.maxaddr()-ihx.minaddr())):
raise Exception('Data read from flash does not match configuration file')
print "Data from flash matches hex file, update successful"
#Finished, report execution time
print "Total time was %.2f seconds" %(t3 - t1)