-
Notifications
You must be signed in to change notification settings - Fork 167
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
scripts: add an example BMP085 run-script file
- Loading branch information
1 parent
a6fc0cf
commit 7109f69
Showing
1 changed file
with
166 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
""" | ||
this serves as an example script. connect a BMP085 to the glasgow, and run the | ||
following command: | ||
glasgow run-script i2c-bmp085.py i2c-initiator -V 3.3 --pin-scl 0 --pin-sda 1 | ||
""" | ||
import struct | ||
import asyncio | ||
|
||
class BMP085: | ||
""" | ||
interface class for the Bosch BMP085 temperature and pressure sensor | ||
""" | ||
CAL_REGS = { | ||
"AC1": ( 0xAA, "h" ), # int16 | ||
"AC2": ( 0xAC, "h" ), # int16 | ||
"AC3": ( 0xAE, "h" ), # int16 | ||
"AC4": ( 0xB0, "H" ), # uint16 | ||
"AC5": ( 0xB2, "H" ), # uint16 | ||
"AC6": ( 0xB4, "H" ), # uint16 | ||
"B1": ( 0xB6, "h" ), # int16 | ||
"B2": ( 0xB8, "h" ), # int16 | ||
"MB": ( 0xBA, "h" ), # int16 | ||
"MC": ( 0xBC, "h" ), # int16 | ||
"MD": ( 0xBE, "h" ), # int16 | ||
} | ||
|
||
def __init__(self, iface, addr=0x77, _debug=False): | ||
self.iface = iface | ||
self.addr = addr | ||
|
||
# set this to True to use the values given in the datasheet | ||
# this will also trigger an assertion if read_temperature() or | ||
# read_pressure() try to produce the wrong values | ||
self._debug = _debug | ||
|
||
|
||
async def _read(self, reg, size): | ||
await self.iface.write(self.addr, [ reg ], stop=False) | ||
return await self.iface.read(self.addr, size, stop=True) | ||
|
||
async def _read_16bit(self, reg, fmt="H"): | ||
raw = await self._read(reg, 2) | ||
return struct.unpack(f">{fmt}", raw)[0] | ||
|
||
async def _write(self, reg, data): | ||
await self.iface.write(self.addr, [ reg, *data ], stop=True) | ||
|
||
def _calc_b5(self, ut): | ||
x1 = ((ut - self._cal["AC6"]) * self._cal["AC5"]) >> 15 | ||
x2 = (self._cal["MC"] << 11) // (x1 + self._cal["MD"]) | ||
return x1 + x2 | ||
|
||
async def _read_calibration(self): | ||
if self._debug: | ||
return { | ||
"AC1": 408, | ||
"AC2": -72, | ||
"AC3": -14383, | ||
"AC4": 32741, | ||
"AC5": 32757, | ||
"AC6": 23153, | ||
"B1": 6190, | ||
"B2": 4, | ||
"MB": -32768, | ||
"MC": -8711, | ||
"MD": 2868, | ||
} | ||
return { k: await self._read_16bit(*v) for k,v in self.CAL_REGS.items() } | ||
|
||
async def _read_temperature_raw(self): | ||
if self._debug: | ||
return 27898 | ||
await self._write(0xf4, [ 0x2e ]) | ||
await asyncio.sleep(0.01) # max conversion time ~4.5ms | ||
return await self._read_16bit(0xf6) | ||
|
||
async def _read_pressure_raw(self, oversample=1): | ||
if self._debug: | ||
return 23843 | ||
assert oversample in range(0, 3) | ||
await self._write(0xf4, [ 0x34 | (oversample << 6) ]) | ||
await asyncio.sleep(0.05) # max conversion time ~25.5ms for osrs=3 | ||
raw = await self._read(0xf6, 3) | ||
b = struct.unpack(">BBB", raw) | ||
raw = (b[0] << 16) | (b[1] << 8) | b[0] | ||
return raw >> (8 - oversample) | ||
|
||
|
||
async def startup(self): | ||
""" | ||
this must be called and awaited before the instance can be used | ||
""" | ||
self._cal = await self._read_calibration() | ||
|
||
async def exists(self): | ||
""" | ||
check that the device exists on the bus | ||
""" | ||
return self.addr in await self.iface.scan() | ||
|
||
async def read_temperature(self): | ||
""" | ||
read the current temperature, returned in degrees Celcius | ||
""" | ||
ut = await self._read_temperature_raw() | ||
b5 = self._calc_b5(ut) | ||
temp_c = ((b5 + 8) >> 4) / 10 | ||
if self._debug: | ||
assert temp_c == 15.0 | ||
return temp_c # Celcius | ||
|
||
async def read_pressure(self, oversample=1): | ||
""" | ||
read the current atmospheric pressure, returned in milibar | ||
""" | ||
if self._debug: | ||
oversample = 0 | ||
ut = await self._read_temperature_raw() | ||
up = await self._read_pressure_raw(oversample) | ||
b5 = self._calc_b5(ut) | ||
b6 = b5 - 4000 | ||
x1 = (self._cal["B2"] * ((b6 ** 2) >> 12)) >> 11 | ||
x2 = (self._cal["AC2"] * b6) >> 11 | ||
x3 = x1 + x2 | ||
b3 = (((self._cal["AC1"] * 4 + x3) << oversample) + 2) // 4 | ||
x1 = (self._cal["AC3"] * b6) >> 13 | ||
x2 = (self._cal["B1"] * (b6 ** 2 >> 12)) >> 16 | ||
x3 = ((x1 + x2) + 2) >> 2 | ||
b4 = (self._cal["AC4"] * (x3 + 32768)) >> 15 | ||
b7 = ((up - b3) * 50000) >> oversample | ||
if b7 < 0x80000000: | ||
p = (b7 * 2) // b4 | ||
else: | ||
p = (b7 // b4) * 2 | ||
x1 = (p >> 8) ** 2 | ||
x1 = (x1 * 3038) >> 16 | ||
x2 = (-7357 * p) >> 16 | ||
pres_p = p + ((x1 + x2 + 3791) >> 4) | ||
if self._debug: | ||
assert pres_p == 69964 | ||
pres_mbar = pres_p / 100 | ||
return pres_mbar # milibar / hPa | ||
|
||
async def read_altitude(self): | ||
""" | ||
estimate the current altitude, returned in meters | ||
""" | ||
pres_mbar = await self.read_pressure() | ||
alt_m = 44330 * (1 - ((pres_mbar / 1013.25) ** (1 / 5.255))) | ||
return alt_m # meters | ||
|
||
s = BMP085(iface) | ||
await s.startup() | ||
|
||
t = await s.read_temperature() | ||
p = await s.read_pressure() | ||
a = await s.read_altitude() | ||
|
||
# power down the sensor after taking the readings | ||
await device.set_voltage("AB", 0) | ||
|
||
print(f"Temperature: {t:.2f}'C") | ||
print(f"Pressure: {p:.2f} mbar") | ||
print(f"Altitude: {a:.1f} meters") |