In [1]:
import pandas as pd
import time
from datetime import datetime, timedelta
from prophet import Prophet
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

In [2]:
global org, token, url, bucket_inf 
bucket_inf = 'sensor_readings'
org = "zhanel.zhexenova@alumni.nu.edu.kz"
token = "Vhm0-oLYDgWW00dLfUercjIrDFvdYhYLY_n3aeYYkPyxiT_bh63cLrrAkgqYbshxGGPC3d4Uc2_e4KhjOGwDMg=="
# Store the URL of your InfluxDB instance
url="https://europe-west1-1.gcp.cloud2.influxdata.com"

In [3]:
client = InfluxDBClient(
   url=url,
   token=token,
   org=org
)

# Forecasting and query

In [4]:
def data_retriever(ds_time='-26h'):
    #making a query to influx to get humidity, temperature and gas values
    query = 'from(bucket: "sensor_readings")'\
        '|> range(start: '+ds_time+')'\
        '|> filter(fn: (r) => r["_measurement"] == "esp32")'\
        '|> filter(fn: (r) => r["GPS_lat"] == "44.507393")'\
        '|> filter(fn: (r) => r["GPS_lon"] == "11.356048")'\
        '|> filter(fn: (r) => r["ID"] == "ESP32_001")'\
        '|> filter(fn: (r) => r["_field"] == "humidity" or r["_field"] == "temperature" or r["_field"] == "gas")'\

    query_api = client.query_api()
    result = query_api.query(org=org, query=query)   
    # Getting values and timestamps from flux tables into lists     
    hum_raw = []
    tem_raw = []
    gas_raw = []

    for table in result:
        for record in table.records:
            if record.get_field() == 'humidity':
                hum_raw.append((record.get_value(), record.get_time()))
            if record.get_field() == 'temperature':
                tem_raw.append((record.get_value(), record.get_time()))  
            if record.get_field() == 'gas':
                gas_raw.append((record.get_value(), record.get_time()))    
            

    # Transforming influx query into dataframes adding 2 hours, because utc is behind by 2h   
    #print("=== influxdb query into dataframes ===")
    
    humidity=pd.DataFrame(hum_raw, columns=['y','ds'], index=None)
    humidity['ds'] = humidity['ds'].values.astype('<M8[s]')  
    humidity['ds'] = humidity['ds'].apply(lambda x: x+timedelta(hours = 2))
    # print('humidity:',humidity['y'][0])

    temperature=pd.DataFrame(tem_raw, columns=['y','ds'], index=None)
    temperature['ds'] = temperature['ds'].values.astype('<M8[s]')  
    temperature['ds'] = temperature['ds'].apply(lambda x: x+timedelta(hours = 2))
    # print('temp:', temperature['y'][0])

    gas=pd.DataFrame(gas_raw, columns=['y','ds'], index=None)
    gas['ds'] = gas['ds'].values.astype('<M8[s]') 
    gas['ds'] = gas['ds'].apply(lambda x: x+timedelta(hours = 2))
    # print('gas:',gas['y'][0])

    return humidity, temperature, gas

def predictor( df, time_future = 60):
    m = Prophet(yearly_seasonality = False, weekly_seasonality= False,daily_seasonality=False, changepoint_prior_scale=0.3)
    m.fit(df)  
    future = pd.DataFrame()
    future['ds'] = df['ds'][-1:]+timedelta(seconds = time_future)  
    fcst = m.predict(future)
    #print('Predicted value: ')
    #print( fcst[['ds','yhat']])
    return fcst

In [5]:
while True:       
        #start = time.time()
        print('Query sent to Influx at {0}'.format(datetime.now().strftime("%d-%m-%y %H:%M:%S")) )
        humidity, temperature, gas = data_retriever()
        #end = time.time()
        #print(f'Time spent on query:{end-start}' )
        print('Data fed to predictor at {0}'.format(datetime.now().strftime("%d-%m-%y %H:%M:%S")) )
        #start0 = time.time()
        pred_humidity = predictor(df = humidity)
        #end0 = time.time()
        #print(f'Time spent on humidity pred:{end0-start0}' )

        #start1 = time.time()
        pred_temp = predictor(df = temperature)
        #end1 = time.time()
        #print(f'Time spent on temperature pred:{end1-start1}' )

        #start2 = time.time()
        pred_gas = predictor(df = gas)
        #end2 = time.time()
        #print(f'Time spent on gas pred:{end2-start2}' )

        # Writing to influx

        write_api = client.write_api(write_options=SYNCHRONOUS)
        write_api.write(bucket_inf, org, Point("esp32") \
                .tag("GPS_lat", "44.507393") \
                .tag("GPS_lon", "11.356048") \
                .tag("ID","ESP32_001") \
                .field("temperature_predicted", str(pred_temp['yhat'][0])) \
                .time(pred_temp['ds'].apply(lambda x: x-timedelta(hours = 2))[0]) \
                .to_line_protocol()     )

        write_api.write(bucket_inf, org, Point("esp32") \
                .tag("GPS_lat", "44.507393") \
                .tag("GPS_lon", "11.356048") \
                .tag("ID","ESP32_001") \
                .field("hum_predicted", str(pred_humidity['yhat'][0])) \
                .time(pred_humidity['ds'].apply(lambda x: x-timedelta(hours = 2))[0]) \
                .to_line_protocol()     )    
        write_api.write(bucket_inf, org, Point("esp32") \
                .tag("GPS_lat", "44.507393") \
                .tag("GPS_lon", "11.356048") \
                .tag("ID","ESP32_001") \
                .field("gas_predicted", str(pred_gas['yhat'][0])) \
                .time(pred_gas['ds'].apply(lambda x: x-timedelta(hours = 2))[0]) \
                .to_line_protocol()     )  
        print('Data sent to Influx at {0}: Temperature-{1:.2f}, Humidity -{2:.2f}, Gas - {3:.2f} at time {4}'.format(datetime.now().strftime("%d-%m-%y %H:%M:%S"),\
                pred_temp['yhat'][0],\
                pred_humidity['yhat'][0],\
                pred_gas['yhat'][0],\
                pred_gas['ds'][0].strftime("%d-%m-%y %H:%M:%S")) )          
        # write_api.close()    
        time.sleep(30)

Query sent to Influx at 15-07-22 23:06:22
Data fed to predictor at 15-07-22 23:06:25
Data sent to Influx at 15-07-22 23:06:45: Temperature-26.09, Humidity -48.65, Gas - 2122.39 at time 15-07-22 23:06:57
Query sent to Influx at 15-07-22 23:07:15
Data fed to predictor at 15-07-22 23:07:17
Data sent to Influx at 15-07-22 23:07:37: Temperature-26.10, Humidity -48.64, Gas - 2122.21 at time 15-07-22 23:07:57
Query sent to Influx at 15-07-22 23:08:07
Data fed to predictor at 15-07-22 23:08:09
Data sent to Influx at 15-07-22 23:08:29: Temperature-26.10, Humidity -48.62, Gas - 2122.06 at time 15-07-22 23:08:57
Query sent to Influx at 15-07-22 23:08:59
Data fed to predictor at 15-07-22 23:09:01
Data sent to Influx at 15-07-22 23:09:23: Temperature-26.11, Humidity -48.62, Gas - 2121.98 at time 15-07-22 23:09:57
Query sent to Influx at 15-07-22 23:09:53
Data fed to predictor at 15-07-22 23:09:55
Data sent to Influx at 15-07-22 23:10:13: Temperature-26.11, Humidity -48.63, Gas - 2121.98 at time 15-