-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
260 lines (221 loc) · 10.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# Networking
import requests
# Object manipulation
import json
import pandas as pd
# Timing
import time
from datetime import datetime
# Math
from scipy.interpolate import Rbf
import numpy as np
# Plotting
import matplotlib.pyplot as plt
# Geospatial
import cartopy
import cartopy.crs as ccrs
from cartopy.io.shapereader import Reader
from cartopy.feature import ShapelyFeature
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
# Environment
import os
import sys
from dotenv import load_dotenv
load_dotenv()
# Logging
import logging
class Ubidots:
def __init__(self, from_cache=True):
self.df = pd.DataFrame(columns=["Date", "Variable", "Device",
"Latitude", "Longitude", "Value"])
self.API_TOKEN = os.getenv("UBIDOTS_KEY")
self.config = "config.json"
self.from_cache = from_cache
self.cache_dir = "cache"
self.devices = {}
self.harvest_areas = []
self.voi = ["salinity", "temperature"] # Varaibles of interest
def get_all_devices(self):
# Get new device list from Ubidots and store in cache
if not self.from_cache:
log.info("Requesting all devices from Ubidots (not cache)")
url = "https://industrial.api.ubidots.com.au/api/v2.0/devices/"
res = requests.get(url, headers={"X-Auth-Token": self.API_TOKEN})
if res.status_code == 200:
with open(self.config, "r") as cf:
config = json.load(cf)
j_res = json.loads(res.text)
device_info = []
for device in j_res["results"]:
for c_device in config["devices"]:
if device["name"] == c_device["name"]:
variables = self.get_device_variables(device["id"])
device_info.append({
"name": device["name"],
"id": device["id"],
"api_label": device["label"],
"latitude": device["properties"]["_location_fixed"]["lat"],
"longitude": device["properties"]["_location_fixed"]["lng"],
"harvest_area": c_device["harvest_area"],
"location": c_device["location"],
"variables": variables
})
self.devices["devices"] = device_info
with open(f"{self.cache_dir}/devices.json", "w") as cache:
json.dump(self.devices, cache, sort_keys=True, indent=4)
else:
log.error(f"Error requesting devices from ubidots: {res.status_code}, {res.text}")
sys.exit(1)
# Get device list from cache (saves a few calls to Ubidots)
else:
log.info(f"Requesting all devices from cache {self.cache_dir}/devices.json")
with open(f"{self.cache_dir}/devices.json", "r") as cache:
self.devices = json.load(cache)
def get_device_variables(self, device_id):
log.info(f"Requesting device variables from Ubidots for ID: {device_id}")
url = f"https://industrial.api.ubidots.com.au/api/v2.0/devices/{device_id}/variables/"
res = requests.get(url, headers={"X-Auth-Token": self.API_TOKEN})
if res.status_code == 200:
j_res = json.loads(res.text)
variable_list = {}
for variable in j_res["results"]:
if variable["name"] in self.voi:
variable_list.update({variable["name"]: variable["id"]})
return variable_list
# Gets a list of unique harvest areas from devices object
def list_harvest_areas(self):
log.info(f"Listing harvest areas")
for device in self.devices["devices"]:
if not device["harvest_area"] in self.harvest_areas:
self.harvest_areas.append(device["harvest_area"])
def resample(self, body, device_name, lat, long, variable):
url = "https://industrial.api.ubidots.com.au/api/v1.6/data/stats/resample/"
res = requests.post(url, headers={"X-Auth-Token": self.API_TOKEN,
"Content-Type": "application/json"}, json=body)
log.info(f"Requested resampled data from Ubidots for {device_name}. Status code = {res.status_code}")
if res.status_code == 200:
j_res = json.loads(res.text)
if len(j_res["results"]) >= 24:
for item in j_res["results"][:25]: # Only take the first 24 hours
ts = ""
if item[0] != None:
ts = datetime.fromtimestamp(int(item[0] / 1000))
v_sum = 0;
v_n = 0;
for value in item[1:]:
if value != None and value < 40 and value >= 0.01:
v_sum += value
v_n += 1
try:
avg = round((v_sum / v_n), 2)
# Append to Dataframe
self.df.loc[len(self.df.index)] = [ts, variable, device_name, lat, long, avg]
except ZeroDivisionError as e:
log.error(f"Values not valid: {e}")
else:
log.error(f"Error requesting resampled data from Ubidots. {res.status_code} {res.text}")
sys.exit(1)
def get_values(self, period="1H"):
log.info(f"Getting latest values from Ubidots. Period = {period}")
for var in self.voi:
for ha in self.harvest_areas:
variable_ids = []
for device in self.devices["devices"]:
if ha == device["harvest_area"]:
body = {
"variables": [device["variables"][var]],
"aggregation": "mean",
"join_dataframes": "true",
"period": period,
"start": int((time.time() - 108000) * 1000), # Roughly 30 hours
"end": int(time.time() * 1000)
}
self.resample(body, device["location"],
device["latitude"], device["longitude"], var)
class Map:
def __init__(self, df=pd.DataFrame(), variables = None):
self.df = df
if self.df.empty:
log.info("Map dataframe not provided reverting to stored .csv")
self.df = pd.read_csv("datasets/latest.csv")
self.extent = [150.1166, 150.1832, -35.6697, -35.7089]
self.overlay_path = "imgs/overlays/bbmap_shadow.png";
self.leases_path = "shapefiles/oyster-leases.shp"
self.output_dir = "public/clyde_river"
self.voi = variables
if self.voi == None:
info.error("Variable list not provided to map")
def generate(self, resolution=100):
x = np.linspace(self.extent[0], self.extent[1], num = resolution)
y = np.linspace(self.extent[2], self.extent[3], num = resolution)
X, Y = np.meshgrid(x, y)
for variable in self.voi:
log.info(f"Generating map of {variable}")
# Little pandas magic to pull the latest value from each variable
index = 0
for date in self.df.where(self.df["Variable"] == variable).groupby('Date'):
date_df = date[1]
log.info(f"Latest reading {date[0]}")
# Don't worry about timestamps where less than 5 bouys reported readings (edge cases)
if len(date_df) < 8:
continue
rbf_interp = Rbf(date_df["Longitude"], date_df["Latitude"],
date_df["Value"].values.astype(float), function="linear")
interpolation = rbf_interp(X, Y[::-1])
plt.rc('font', size=16)
fig, ax = plt.subplots(1, 1, figsize=(14, 8),
subplot_kw=dict(projection=ccrs.PlateCarree()))
# Could be moved to config.json
cmap = plt.cm.RdYlBu
label = "Salinity (ppt)"
v_min = 0
v_max = 40
if variable == "temperature":
cmap = plt.cm.RdYlBu_r
label = u"Temperature (\N{DEGREE SIGN}C)"
v_min = 10
v_max = 35
# Underlying interpolation
m = ax.imshow(interpolation, extent=self.extent, aspect="auto",
cmap=cmap, vmin=v_min, vmax=v_max, zorder=1)
# Map img overlay
ax.imshow(plt.imread(self.overlay_path), extent=self.extent, zorder=2)
# Oyster lease overlay
ax.add_feature(ShapelyFeature(Reader(self.leases_path).geometries(),
ccrs.PlateCarree(), facecolor='grey', alpha=0.8), zorder=3)
# Buoy overlay
ax.scatter(date_df["Longitude"], date_df["Latitude"], c="#c33c39",
edgecolor="w", linewidth=2, s=80, zorder=4)
# Gridline and map setup
ax.set_extent(self.extent)
ax.set_aspect('auto', adjustable=None)
gridLines = ax.gridlines(draw_labels=True, zorder=3)
gridLines.xformatter = LONGITUDE_FORMATTER
gridLines.yformatter = LATITUDE_FORMATTER
gridLines.right_labels = False
gridLines.top_labels = False
plt.colorbar(m, label=label, format=lambda x, _: f"{x:.0f}")
plt.title(f"{date[0]}")
plt.tight_layout()
out_dir = f"{self.output_dir}/{variable}/{index}.png"
plt.savefig(out_dir, dpi=72)
plt.close(fig)
log.info(out_dir)
index += 1
if __name__ == "__main__":
log = logging.getLogger("logger")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("log/debug.log"),
logging.StreamHandler(sys.stdout)
]
)
ubi = Ubidots()
ubi.get_all_devices()
ubi.list_harvest_areas()
ubi.get_values()
ubi.df.to_csv("datasets/latest.csv", index=False)
m = Map(df = ubi.df, variables = ubi.voi)
m.generate()