In [65]:
import os, time
from influxdb_client import InfluxDBClient, QueryApi
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.exceptions import InfluxDBError
import time

In [66]:
token = os.environ.get("INFLUXDB_TOKEN")
org = "th-koeln"
url = "http://localhost:8086"
bucket = "iot_telemetry_data"

# Queries

In [67]:
average_humidity_per_device_query = """
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) =>
      r._measurement == "iot_telemetry" and
      r._field == "humidity"
  )
  |> pivot(
      rowKey:   ["device"],
      columnKey: ["_field"],
      valueColumn: "_value"
  )
  |> mean(column: "humidity")
  |> sort(columns: ["device"])
"""

In [68]:
highest_temp_query = """
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) =>
      r._measurement == "iot_telemetry" and
      r._field == "temp"
  )
  |> group()
  |> max(column: "_value")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
"""

In [69]:
worst_air_quality_per_device_query = """
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) =>
      r._measurement == "iot_telemetry" and
      (r._field == "co" or r._field == "lpg" or r._field == "smoke")
  )
  |> pivot(
      rowKey:   ["device", "_time"],
      columnKey:["_field"],
      valueColumn: "_value"
  )
  |> map(fn: (r) => ({
      device: r.device,
      air_quality_score: r.co + r.lpg + r.smoke
  }))
  |> group(columns:["device"])
  |> sum(column: "air_quality_score")
  |> sort(columns: ["air_quality_score"], desc: true)
"""

In [70]:
hourly_temp_hum_query = """
data = from(bucket: "iot_telemetry_data")
  |> range(start: 2002-04-07T00:00:00Z, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "iot_telemetry")
  |> filter(fn: (r) =>
    r["device"] == "00:0f:00:70:91:0a" or
    r["device"] == "1c:bf:ce:15:ec:4d" or
    r["device"] == "eb:b8:27:bf:9d:51"
  )
  |> filter(fn: (r) => r["_field"] == "temp" or r["_field"] == "humidity")


mean_temp = data
  |> filter(fn: (r) => r["_field"] == "temp")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> drop(columns: ["_measurement", "_field"]) // Nicht mehr benötigte Spalten entfernen


max_humidity = data
  |> filter(fn: (r) => r["_field"] == "humidity")
  |> aggregateWindow(every: 1h, fn: max, createEmpty: false)
  |> drop(columns: ["_measurement", "_field"]) // Nicht mehr benötigte Spalten entfernen


// Ergebnisse zusammenführen und pivotieren
join(tables: {temp: mean_temp, humidity: max_humidity}, on: ["_time", "_stop", "_start", "device"])
  |> rename(columns: { _value_temp: "mean_temp", _value_humidity: "max_humidity"}) // Spalten umbenennen für Klarheit
  |> sort(columns: ["_time", "device"]) // Optional: Sortieren für bessere Lesbarkeit
"""

In [71]:
temperature_rate_of_change_per_minute_by_device = """
from(bucket: "iot_telemetry_data")
  |> range(start: 2020-07-12T00:00:00Z, stop: 2020-07-19T23:59:59Z)
  |> filter(fn: (r) => r["_measurement"] == "iot_telemetry")
  |> filter(fn: (r) => r["_field"] == "temp")
  |> group(columns: ["device"])
  |> derivative(unit: 1m, nonNegative: false)
  |> yield(name: "temperature_rate_of_change_per_minute")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
"""

In [72]:
times_stamps_query = '''
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) => r._measurement == "iot_telemetry")
  |> keep(columns: ["_time"])
'''

In [73]:
avg_temp_humidity_ratio_query = """
from(bucket: "iot_telemetry_data")
    |> range(start: 2020-07-14T00:01:00Z, stop: 2020-07-19T03:01:00Z)
    |> filter(fn: (r) => r._measurement == "iot_telemetry" and (r._field == "temp" or r._field == "humidity"))
    |> pivot(rowKey:["_time", "device"], columnKey: ["_field"], valueColumn: "_value")
    |> filter(fn: (r) => exists r.temp and exists r.humidity and r.humidity != 0.0)
    |> map(fn: (r) => ({ r with temp_humidity_ratio: r.temp / r.humidity }))
    |> mean(column: "temp_humidity_ratio")
    |> yield(name: "avg_temp_humidity_ratio")
"""

In [74]:
average_humidity_per_device_query_complex = """
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) =>
      r._measurement == "iot_telemetry" and
      r._field == "humidity"
  )
  |> group(columns: ["device"])
  |> reduce(fn: (r, acc) => ({
      device: r.device,
      sum: acc.sum + r._value,
      count: acc.count + 1
  }), identity: {sum: 0.0, count: 0})
  |> map(fn: (r) => ({ r with avg_humidity: r.sum / float(v: r.count) }))
  |> keep(columns: ["device", "avg_humidity"])
  |> sort(columns: ["device"])
"""

In [75]:
highest_temp_query_complex = """
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) =>
      r._measurement == "iot_telemetry" and
      r._field == "temp"
  )
  |> aggregateWindow(every: 1h, fn: max, createEmpty: false)
  |> group()
  |> max(column: "_value")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
"""

In [76]:
import pandas as pd

try:
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = QueryApi(influxdb_client=client)
    result = pd.DataFrame(query_api.query_data_frame(times_stamps_query))
    print(result.min())
    print(result.max())
except InfluxDBError as e:
    print('Error: ', e)


The result will not be shaped to optimal processing by pandas.DataFrame. Use the pivot() function by:

    
from(bucket: "iot_telemetry_data")
  |> range(start: 0)
  |> filter(fn: (r) => r._measurement == "iot_telemetry")
  |> keep(columns: ["_time"])
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")



For more info see:
    - https://docs.influxdata.com/resources/videos/pivots-in-flux/
    - https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/
    - https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/



result                             _result
table                                    0
_time     2020-07-12 00:01:34.385974+00:00
dtype: object
result                             _result
table                                    0
_time     2020-07-20 00:03:37.264312+00:00
dtype: object


# Performance Analysis

In [77]:
query_times = {}

In [78]:
try:
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = QueryApi(influxdb_client=client)
    start_time = time.time()
    result = query_api.query_data_frame(average_humidity_per_device_query)
    end_time = time.time()
    print(f"Query took {end_time - start_time} seconds.")
    query_times['Average Humidity'] = (end_time - start_time)
except InfluxDBError as e:
    print('Error: ', e)

Query took 0.16396498680114746 seconds.


In [79]:
try:
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = QueryApi(influxdb_client=client)
    start_time = time.time()
    result = query_api.query_data_frame(highest_temp_query)
    end_time = time.time()
    print(f"Query took {end_time - start_time} seconds.")
    query_times['Highest Temp'] = (end_time - start_time)
except InfluxDBError as e:
    print('Error: ', e)

Query took 0.01749563217163086 seconds.


In [80]:
try:
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = QueryApi(influxdb_client=client)
    start_time = time.time()
    result = query_api.query_data_frame(worst_air_quality_per_device_query)
    end_time = time.time()
    print(f"Query took {end_time - start_time} seconds.")
    query_times['Worst Air'] = (end_time - start_time)
except InfluxDBError as e:
    print('Error: ', e)

Query took 1.1976702213287354 seconds.


In [81]:
try:
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = QueryApi(influxdb_client=client)
    start_time = time.time()
    result = query_api.query_data_frame(avg_temp_humidity_ratio_query)
    end_time = time.time()
    print(f"Query took {end_time - start_time} seconds.")
    query_times['Avg Temp Humidity Ratio'] = (end_time - start_time)
except InfluxDBError as e:
    print('Error: ', e)

Query took 1.7272686958312988 seconds.


# Summary

In [82]:
print(query_times)

{'Average Humidity': 0.16396498680114746, 'Highest Temp': 0.01749563217163086, 'Worst Air': 1.1976702213287354, 'Avg Temp Humidity Ratio': 1.7272686958312988}


In [83]:
performance = "InfluxDB query times: " + str(query_times)

In [84]:
filename = "performance.txt"


# 'a' öffnet die Datei im Append-Modus (erstellt sie, falls sie nicht existiert)
with open(filename, "a", encoding="utf-8") as f:
    f.write(performance + "\n")

print(performance)


InfluxDB query times: {'Average Humidity': 0.16396498680114746, 'Highest Temp': 0.01749563217163086, 'Worst Air': 1.1976702213287354, 'Avg Temp Humidity Ratio': 1.7272686958312988}
