/
pypruss.py
218 lines (192 loc) · 8.19 KB
/
pypruss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os
import sys
import subprocess
import io
import select
import mmap
import struct
#State definitions
PRU_OFFLINE = 0
PRU_STOPPED = 1
PRU_RUNNING = 2
PRU_HALTED = 3
#Memory definitions
PRU_ICSS = 0x4A300000
PRU_ICSS_LEN = 512*1024
PRU_DRAM0 = 0x00000000
PRU_DRAM1 = 0x00002000
PRU_SRAM = 0x00010000
class PRU(object):
"""
The main class which encapsulates a PRU
:param number: The PRU number, 0/1 corresponding to PRU0 and PRU1 in the PRUSS
:type number: int
:param fw: Optional argument. If specified, load the firmware image onto the PRU with the given number
:type fw: str
"""
def __init__(self, number=0, fw=None):
if number != 0 and number != 1:
sys.exit("Invalid PRU number (Can be 0 or 1 only)")
self.number = number
self.modprobe()
self.reset()
if fw is not None:
self.load(fw)
@classmethod
def modprobe(cls):
""" Check if the Remoteproc Driver is loaded. If not, Load the driver module. """
if subprocess.call("/sbin/modprobe pru_rproc", shell=True) :
sys.exit("Probing Remoteproc Driver Failed")
@classmethod
def modunprobe(cls):
""" Remove the Remoteproc Driver module. """
if subprocess.call("/sbin/modprobe -r pru_rproc", shell=True) :
sys.exit("Failed removing the Remoteproc Driver")
def enable(self):
""" Enable/Start the PRU. """
try:
with open('/sys/class/remoteproc/remoteproc'+str(self.number+1)+'/state', 'w') as fd:
fd.write('start')
fd.close()
self.state = PRU_RUNNING
except OSError:
pass
def disable(self):
""" Disable/Stop the PRU. """
try:
with open('/sys/class/remoteproc/remoteproc'+str(self.number+1)+'/state', 'w') as fd:
fd.write('stop')
fd.close();
self.state = PRU_STOPPED
except OSError:
pass
def reset(self):
""" Reset the PRU. """
self.disable()
self.enable()
def load(self, filepath):
"""
Load the firmware image specified by @filepath and restarts the PRU
:param filepath: The firmware image to be loaded with the path to the file
:type filepath: str
"""
self.disable()
if subprocess.call('cp '+filepath+' /lib/firmware/am335x-pru'+str(self.number)+'-fw', shell=True):
print("Error loading firmware on PRU"+str(self.number))
self.enable()
# RPMsg functions
def send_msg(self, message, channel=None):
"""
Send a message to the PRU through the @channel
:param message: The message to be sent to the PRU
:type message: str
:param channel: The channel which is created by the RPMsg driver and specified in the firmware
If no channel is specified, send the message to channel 30/31 depending on the PRU number
:type channel: int
"""
if channel is None:
channel = '3'+str(self.number)
devpath = '/dev/rpmsg_pru'+str(channel)
if os.path.exists(devpath):
with open(devpath, 'w') as fd:
fd.write(message+'\n');
fd.close()
else:
print("Error Sending Message on "+channel+": Channel not found")
def get_msg(self, channel=None):
"""
Recieve the message from the PRU through the @channel
:param channel: The channel through which the PRU will send the message. It will be created by the RPMsg driver
and specified in the firmware. If no channel is specified, Recieve the message on
channel 30/31 for PRU0/PRU1 respectively
:type channel: int
"""
if channel is None:
channel = '3'+str(self.number)
devpath = '/dev/rpmsg_pru'+str(channel)
if os.path.exists(devpath):
with io.open(devpath, 'r') as fd:
data = fd.readline().strip()
fd.close()
return data
else:
print("Error Recieving Message from "+channel+": Channel not found")
def wait_for_event(self, channel=None):
"""
Wait for an event from the PRU on the @channel. The function blocks until a signal/message is recieved on the
channel
:param channel: The channel on which the event will be signaled by the PRU. If no If no channel is specified, the
wait for event on channel 30/31 for PRU0/PRU1 respectively
:type channel: int
"""
if channel is None:
channel = '3'+str(self.number)
devpath = '/dev/rpmsg_pru'+str(channel)
if os.path.exists(devpath):
with open(devpath, 'r') as fd:
fd.read(1);
fd.close()
else:
print("Error : Channel not found")
#Memory functions
def mem_writeint(self, address, data, shared=False):
"""
Write @data, and integer(4 bytes) to the memory offset specified by @address of the PRU Data/Shared Memory.
:param shared: If True, write to the PRU Shared Memory, else write to the respective Data Memories.
Defaults to False.
:param data: An int, the data to be written.
:param address: The address offset to the Respective Data Memory/Shared Memory
"""
base = PRU_SRAM if shared else (PRU_DRAM1 if self.number else PRU_DRAM0)
with open('/dev/mem', 'r+b') as fd:
pru_mem = mmap.mmap(fd.fileno(), PRU_ICSS_LEN, offset=PRU_ICSS)
pru_mem[base+address: base+address+4] = struct.pack('L', data)
pru_mem.close()
fd.close()
def mem_readint(self, address, shared=False):
"""
Read an int(4 bytes) from the memory offset specified by @address of the PRU Data/Shared Memory.
:param shared: If True, read from the PRU Shared Memory, else Read from the respective Data Memories.
Defaults to False.
:type shared: boolean
:param address: The address offset to the Respective Data Memory/Shared Memory.
:return: An int, the data read from the memory.
"""
base = PRU_SRAM if shared else (PRU_DRAM1 if self.number else PRU_DRAM0)
with open('/dev/mem', 'r+b') as fd:
pru_mem = mmap.mmap(fd.fileno(), PRU_ICSS_LEN, offset=PRU_ICSS)
data = struct.unpack('L', pru_mem[base+address: base+address+4])[0]
pru_mem.close()
fd.close()
return data
def mem_writebyte(self, address, data, shared=False):
"""
Write @data, a byte to the memory offset specified by @address of the PRU Data/Shared Memory.
:param shared: If True, write to the PRU Shared Memory, else write to the respective Data Memories.
Defaults to False.
:type shared: boolean
:param data: A byte, the Data to be written.
:param address: The address offset to the Respective Data Memory/Shared Memory.
"""
base = PRU_SRAM if shared else (PRU_DRAM1 if self.number else PRU_DRAM0)
with open('/dev/mem', 'r+b') as fd:
pru_mem = mmap.mmap(fd.fileno(), PRU_ICSS_LEN, offset=PRU_ICSS)
pru_mem[base+address: base+address+1] = struct.pack('B', data)
pru_mem.close()
fd.close()
def mem_readbyte(self, address, shared=False):
"""
Read a byte from the memory offset specified by @address of the PRU Data/Shared Memory.
:param shared: If True, read from the PRU Shared Memory, else read from the respective Data Memories.
Defaults to False.
:type shared: boolean
:param address: The address offset to the Respective Data Memory/Shared Memory.
:return: A byte, the data read from the memory.
"""
base = PRU_SRAM if shared else (PRU_DRAM1 if self.number else PRU_DRAM0)
with open('/dev/mem', 'r+b') as fd:
pru_mem = mmap.mmap(fd.fileno(), PRU_ICSS_LEN, offset=PRU_ICSS)
data = struct.unpack('B', pru_mem[base+address: base+address+1])[0]
pru_mem.close()
fd.close()
return data