-
Notifications
You must be signed in to change notification settings - Fork 0
/
senec_api.py
198 lines (149 loc) · 7.11 KB
/
senec_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""All code contacting the senec api"""
import json
import enum
import datetime
import requests
from dateutil.relativedelta import relativedelta
# Define the base URL
BASE_URL = "https://app-gateway-prod.senecops.com/v1/senec"
class TimePeriod(enum.Enum):
"""The allowed time period values of senec api"""
HOUR = "STUNDE"
DAY = "TAG"
MONTH = "MONAT"
YEAR = "JAHR"
def login(username, password):
"""Login to senec and return the generated token"""
# Define the login URL
login_url = f"{BASE_URL}/login"
# Define the payload data as a dictionary
payload_data = {"username": username, "password": password}
# Convert the payload dictionary to JSON format
payload_json = json.dumps(payload_data)
# Define headers
headers = {"Content-Type": "application/json"}
try:
# Send a POST request to the login URL with the JSON payload
response = requests.post(login_url, data=payload_json, headers=headers, timeout=10)
# Check if the login request was successful (status code 200)
if response.status_code == 200:
# Extract the token from the response JSON
response_data = response.json()
access_token = response_data.get("token")
print("Login successful! Token: " + access_token)
else:
print(f"Login failed with status code: {response.status_code}")
except requests.exceptions.RequestException as error:
print(f"An error occurred during login: {error}")
return access_token
def get_devices(access_token):
"""Get the list of senec devices"""
if access_token is None:
print("Please login first.")
return None
# Define the devices URL
devices_url = f"{BASE_URL}/anlagen"
# Define headers with the authorization token
headers = {"authorization": access_token}
try:
# Send a GET request to the devices URL with the authorization header
response = requests.get(devices_url, headers=headers, timeout=10)
# Check if the request was successful (status code 200)
if response.status_code == 200:
devices_data = response.json()
return devices_data
print(f"Failed to retrieve devices with status code: {response.status_code}")
except requests.exceptions.RequestException as error:
print(f"An error occurred while retrieving devices: {error}")
return None
def get_dashboard(access_token, device_id):
"""get the data from dashboard api endpoint and print it to console"""
if access_token is None:
print("Please login first.")
return
# Define the dashboard URL
dashboard_url = f"{BASE_URL}/anlagen/{device_id}/dashboard"
# Define headers with the authorization token
headers = {"authorization": access_token}
try:
# Send a GET request to the dashboard URL with the authorization header
response = requests.get(dashboard_url, headers=headers, timeout=10)
# Check if the request was successful (status code 200)
if response.status_code == 200:
dashboard_data = response.json()
# Write the response data to a JSON file
with open("senec_dashboard_data.json", "w", encoding="utf-8") as json_file:
json.dump(dashboard_data, json_file, indent=4)
else:
print(f"Failed to retrieve dashboard with status code: {response.status_code}")
except requests.exceptions.RequestException as error:
print(f"An error occurred while retrieving dashboard data: {error}")
def get_zeitverlauf_data(access_token, device_id, period, datetime_obj, timezone):
"""get the data from zeitverlauf api endpoint and print it to a json"""
# Define the API endpoint
endpoint = f"anlagen/{device_id}/zeitverlauf"
# Format the datetime object as a string
datetime_str = datetime_obj.strftime("%Y-%m-%dT%H:%M:%SZ")
# Define query parameters
params = {"periode": period, "timezone": timezone, "before": datetime_str}
headers = {"authorization": access_token}
try:
# Send a GET request to the API endpoint with the query parameters
response = requests.get(f"{BASE_URL}/{endpoint}", params=params, headers=headers, timeout=20)
# Check if the request was successful (status code 200)
if response.status_code == 200:
zeitverlauf_data = response.json()
#print("Zeitverlauf Data:")
#print(zeitverlauf_data)
# Write the response data to a JSON file
with open("senec_time_aggregation_data.json", "w", encoding="utf-8") as json_file:
json.dump(zeitverlauf_data, json_file, indent=4)
else:
print(f"Request failed with status code: {response.status_code}")
print("Request details:")
print(f"URL: {response.request.url}")
print(f"Headers: {response.request.headers}")
print(f"Body: {response.request.body}")
except requests.exceptions.RequestException as error:
print(f"An error occurred during the API call: {error}")
def get_statistik_data(access_token, device_id, timezone, locale):
"""get the data from statistik api endpoint and print it to a json"""
# Define the API endpoint
endpoint = f"anlagen/{device_id}/statistik"
headers = {"authorization": access_token}
date = datetime.datetime.now()
date = datetime.datetime(date.year, 12, 31)
aggregations = []
try:
has_data = True
while has_data:
# Define query parameters
params = {"periode": TimePeriod.YEAR.value, "timezone": timezone, "datum": date.strftime("%Y-%m-%d"), "locale": locale}
# Send a GET request to the API endpoint with the query parameters
response = requests.get(f"{BASE_URL}/{endpoint}", params=params, headers=headers, timeout=20)
# Check if the request was successful (status code 200)
if response.status_code == 200:
zeitverlauf_data = response.json()
#print("Zeitverlauf Data:")
#print(zeitverlauf_data)
aggregation = zeitverlauf_data.get("aggregation")
if aggregation:
# if aggregation has data, collect it
aggregations.append(aggregation)
# try again with one year before
date = date - relativedelta(years=1)
else:
# If not, break the while
has_data = False
else:
print(f"Request failed with status code: {response.status_code}")
print("Request details:")
print(f"URL: {response.request.url}")
print(f"Headers: {response.request.headers}")
print(f"Body: {response.request.body}")
has_data = False
# Write the response data to a JSON file
with open("senec_statistic_data.json", "w", encoding="utf-8") as json_file:
json.dump(aggregations, json_file, indent=4)
except requests.exceptions.RequestException as error:
print(f"An error occurred during the API call: {error}")