In [31]:
import time
from datetime import datetime
import random
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import certifi
import pandas as pd

## Database Connection

In [32]:
# InfluxDB configuration
token = '8Kph_oFbnNqTvgUfERhTJveJpw1A08-HDsnhftEIYj8HRSUJSmKIdoPZdaIbNeIkG4likhDrOtr5FgyF5W03hg=='
org = "Ethan"
bucket = "DashboardTest"  # Use "bucket" instead of "database" in influxdb-client
url = "https://us-east-1-1.aws.cloud2.influxdata.com"

# Create InfluxDB client
client = InfluxDBClient(url=url, token=token, org=org, ssl_ca_cert=certifi.where())
write_api = client.write_api(write_options=SYNCHRONOUS)

In [33]:
# Original data templates
dht_sensor_data_template = [
    {"Zone": "Zone 1", "Temp_C": 20.5, "Temp_F": 68.9, "Humidity": 55},
    {"Zone": "Zone 2", "Temp_C": 21.0, "Temp_F": 69.8, "Humidity": 60},
    {"Zone": "Zone 3", "Temp_C": 19.8, "Temp_F": 67.6, "Humidity": 58},
]

water_sensor_data_template = {"RTD": 20.97, "pH": 7.38, "EC": 1430}

In [34]:
# Helper function to generate random values within a margin of error
def randomize_value(value, margin=0.15):
    error = value * margin
    return round(random.uniform(value - error, value + error), 2)

# Function to generate randomized data
def generate_dht_data():
    return [
        {
            "Zone": entry["Zone"],
            "Temp_C": randomize_value(entry["Temp_C"]),
            "Temp_F": randomize_value(entry["Temp_F"]),
            "Humidity": randomize_value(entry["Humidity"]),
        }
        for entry in dht_sensor_data_template
    ]

def generate_water_data():
    return {
        "RTD": randomize_value(water_sensor_data_template["RTD"]),
        "pH": randomize_value(water_sensor_data_template["pH"]),
        "EC": randomize_value(water_sensor_data_template["EC"]),
    }

# Function to upload DHT sensor data
def upload_dht_data(data):
    for entry in data:
        point = (
            Point("dht_sensors")
            .tag("Zone", entry["Zone"])
            .field("Temp_C", entry["Temp_C"])
            .field("Temp_F", entry["Temp_F"])
            .field("Humidity", entry["Humidity"])
        )
        write_api.write(bucket=bucket, org=org, record=point)

# Function to upload water sensor data
def upload_water_data(data):
    point = (
        Point("water_sensors")
        .field("RTD", data["RTD"])
        .field("pH", data["pH"])
        .field("EC", data["EC"])
    )
    write_api.write(bucket=bucket, org=org, record=point)


In [36]:
# Main loop to run at the top of every minute for 5 minutes
for i in range(1, 6):
    # Wait until the top of the next minute
    now = time.time()
    sleep_time = 60 - (now % 60)
    time.sleep(sleep_time)

    # Generate and upload data
    dht_data = generate_dht_data()
    water_data = generate_water_data()

    upload_dht_data(dht_data)
    upload_water_data(water_data)

    print(f"Uploaded Minute [{i}]")

Uploaded Minute [1]
Uploaded Minute [2]
Uploaded Minute [3]
Uploaded Minute [4]
Uploaded Minute [5]


In [35]:
datetime.now()

datetime.datetime(2024, 10, 18, 11, 39, 35, 408072)

## Sample Queries

Return All DHT Data

In [40]:
# Query
querier = client.query_api()
tables = querier.query(f'''
from(bucket: "{bucket}")
  |> range(start: -1y)
  |> filter(fn: (r) => r["_measurement"] == "dht_sensors")''',  #DHT Data 
  org=org)

# Data Extraction
records = []
for table in tables:
    for record in table.records:
        records.append({
            'time': record.get_time(),
            'sensor': record.values['sensor'],
            record.get_field(): record.get_value()
        })

# Convert to Pandas DataFrame
df = pd.DataFrame(records)
df = df.pivot_table(index=['time', 'sensor'], values=['Temp_C', 'Temp_F', 'Humidity'], aggfunc='first').reset_index() # Convert to fields
df['time'] = pd.to_datetime(df['time']).dt.tz_convert('US/Eastern') # Adjust 'time' to EST

df

Unnamed: 0,time,sensor,Humidity,Temp_C,Temp_F
0,2024-09-09 14:41:03.937043-04:00,DHT17,55.0,20.5,68.9
1,2024-09-09 14:41:05.114432-04:00,DHT18,60.0,21.0,69.8
2,2024-09-09 14:41:06.189103-04:00,DHT22,58.0,19.8,67.6


Return All Water Data

In [38]:
# Query
querier = client.query_api()
tables = querier.query(f'''
from(bucket: "{bucket}")
  |> range(start: -1y)
  |> filter(fn: (r) => r["_measurement"] == "water_sensors")''',  #Water data
  org=org)

# Data Extraction
records = []
for table in tables:
    for record in table.records:
        records.append({
            'time': record.get_time(),
            'sensor': record.values['sensor'],
            record.get_field(): record.get_value()
        })

# Convert to Pandas DataFrame
df = pd.DataFrame(records)
df = df.pivot_table(index=['time', 'sensor'], values=['EC', 'pH', 'RTD'], aggfunc='first').reset_index() # Convert to fields
df['time'] = pd.to_datetime(df['time']).dt.tz_convert('US/Eastern') # Adjust 'time' to EST

df

Unnamed: 0,time,sensor,EC,RTD,pH
0,2024-09-09 14:41:07.772529-04:00,water_sensor,1.1,0.98,6.5
1,2024-09-09 14:41:08.943115-04:00,water_sensor,1.2,0.95,6.6
2,2024-09-09 14:41:10.043278-04:00,water_sensor,1.3,0.99,6.4


Return Specific DHT Data

In [39]:
# Query
querier = client.query_api()
tables = querier.query(f'''
from(bucket: "{bucket}")
  |> range(start: -1y)
  |> filter(fn: (r) => r["_measurement"] == "dht_sensors")
  |> filter(fn: (r) => r["sensor"] == "DHT17")''',  # DHT17 Data
  org=org)

# Data Extraction
records = []
for table in tables:
    for record in table.records:
        records.append({
            'time': record.get_time(),
            'sensor': record.values['sensor'],
            record.get_field(): record.get_value()
        })

# Convert to Pandas DataFrame
df = pd.DataFrame(records)
df = df.pivot_table(index=['time', 'sensor'], values=['Temp_C', 'Temp_F', 'Humidity'], aggfunc='first').reset_index() # Convert to fields
df['time'] = pd.to_datetime(df['time']).dt.tz_convert('US/Eastern') # Adjust 'time' to EST

df

Unnamed: 0,time,sensor,Humidity,Temp_C,Temp_F
0,2024-09-09 14:41:03.937043-04:00,DHT17,55.0,20.5,68.9
