-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_fetcher.py
48 lines (46 loc) · 1.78 KB
/
data_fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Standard Library
from datetime import datetime, timedelta
# Project Files
from iot_time_series import IoTTimeSeries
class DataFetcher(object):
""" Fetch data from the MindSphere IoT Time Series Service. """
def __init__(self, auth_token, asset_id, aspect):
self.auth_token = auth_token
self.asset_id = asset_id
self.aspect = aspect
# Set the time range to 5 days. Note that 2000 is the maximum number of
# time series data points to be retrieved.
self.fmt = '%Y-%m-%dT%H:%M:%S.%f'
self.now = datetime.utcnow()
# Required time format: 'YYYY-mm-ddTHH:MM:SS.sssZ'.
self.start = (self.now - timedelta(days=5)).strftime(self.fmt)[:-3] + 'Z'
self.end = self.now.strftime(self.fmt)[:-3] + 'Z'
def fetch_data(self, variable):
""" Read the timeseries data of the variable. """
timestamps = []
values = []
response = IoTTimeSeries().read(
self.auth_token,
self.asset_id,
self.aspect,
_from=self.start,
to=self.end,
select=variable
)
if response.status_code == 200:
data = response.json()
# Read the latest record if the last 5 days has no data points.
if not data:
response = IoTTimeSeries().read(
self.auth_token,
self.asset_id,
self.aspect,
select=variable
)
if response.status_code == 200:
data = response.json()
# Read timestamps and values.
for data_point in data:
timestamps.append(data_point['_time'])
values.append(data_point[variable])
return (timestamps, values)