Start

see: https://www.influxdata.com/blog/forecasting-with-fb-prophet-and-influxdb/

In [None]:
pip install pandas prophet

Import dependencies:

In [None]:
import pandas as pd
import time
from datetime import datetime
from fbprophet import Prophet

Define Auth parameters and connect to client:



In [None]:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
token = $my-token
bucket = $my-bucket
org = $my-org
client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)

Create a Flux query:

In [None]:
query = 'from(bucket:"fbprophet")' \
        ' |> range(start:2007-12-10T15:00:00Z, stop:2016-01-20T15:00:00Z)'\
        ' |> filter(fn: (r) => r._measurement == "views")' \
        ' |> filter(fn: (r) => r._field == "y")'

Query InfluxDB and return the results:

In [None]:
result = client.query_api().query(org=org, query=query)

Convert the results into a list:

In [None]:
raw = []
for table in result:
    for record in table.records:
        raw.append((record.get_value(), record.get_time()))
print(raw[0:5])

Convert raw data to DataFrame:

In [None]:
print()
print("=== influxdb query into dataframe ===")
print()
df=pd.DataFrame(raw, columns=['y','ds'], index=None)
df['ds'] = df['ds'].values.astype('<M8[D]')
df.head()

Fit the model by instantiating a new Prophet object and passing in the historical DataFrame:



In [None]:
m = Prophet()
m.fit(df)

Use the helper method Prophet.make_future_dataframe to prepare your dataframe for forecasting:

In [None]:
#365 specifies the number of time series points you'd like to forecast onto
future = m.make_future_dataframe(periods=365)

Make your prediction:

In [None]:
#The predict method will assign each row in future a predicted value (yhat). The upper (yhat_upper) and lower (yhat_lower) confidence intervals are also included as a part of the forecast. Columns for components and uncertainty intervals are also included in the forecast, although they aren't displayed here.

forecast = m.predict(future)
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

AT Converting the DataFrame to Line Protocol

Now we’re ready to convert our prediction to Line Protocol and write it to our instance.

Add a measurement column to our DataFrame:

In [None]:
forecast['measurement'] = "views"

Convert the DataFrame to Line Protocol:



In [None]:
cp = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','measurement']].copy()
lines = [str(cp["measurement"][d])
         + ",type=forecast"
         + " "
         + "yhat=" + str(cp["yhat"][d]) + ","
         + "yhat_lower=" + str(cp["yhat_lower"][d]) + ","
         + "yhat_upper=" + str(cp["yhat_upper"][d])
         + " " + str(int(time.mktime(cp['ds'][d].timetuple()))) + "000000000" for d in range(len(cp))]

Write the lines to your instance:

In [None]:
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

_write_client = client.write_api(write_options=WriteOptions(batch_size=1000,
                                                            flush_interval=10_000,
                                                            jitter_interval=2_000,
                                                            retry_interval=5_000))

_write_client.write(bucket, org, lines)

Close the client:

In [None]:
_write_client.__del__()
client.__del__()

## Visualizing our forecast with InfluxDB UI
Finally we can use the UI to quickly visualize the forecast we made. We simply select the bucket, measurement, and field values that we want to display to construct a Flux query. Hitting “Submit” builds this beautiful visualization: