In [2]:
from influxdb import InfluxDBClient,DataFrameClient
import pandas as pd
import time, datetime
import json
import requests
from pyarrow import parquet
import pyarrow as pa
import pyarrow.compute as pc

# Connection Parameters
HOST = "localhost"
PORT = 8083
USER = "testuser"
PW = "testpw"
DB = "grid1_ts"

In [34]:
## Create InfluxDB Client
def createIfClient(host, port, username, password, database) :
    client = InfluxDBClient(host=host, port=port, username=username, password=password, database=database)
    return client

## Get list of Measurements
def getMeasurements(client:InfluxDBClient):
    msList = [list(m.values())[0] for m in client.get_list_measurements()]
    return msList

## Get list of DBs except internal db
def getDBs(client:InfluxDBClient):
    dbList = [list(db.values())[0] for db in client.get_list_database() if list(db.values())[0] != '_internal']
    return dbList

## Get list of Tags
def getTagKeys(client:InfluxDBClient, measurements:list):
    tagList = []
    for measurement in measurements:
        Q = "show TAG KEYS from "+ measurement
        tagList.extend([list(tag.values())[0] for tag in list(client.query(query=Q))[0]])
    return tagList
    
## Create Queries 
def createSelectQueries(msList:list, db,tag:str, interval:int):
    queryList =["http://localhost:8086/query?db={0}&q=select%20mean(*)%20from%20{1}%20where%20group%20by%20time({2}m),%20{3}"
                .format(db,ms,interval,tag) for ms in msList]
    return queryList

## Create Delete Queries
def createDeleteQueries(msList:list, db:str):
    queryList =["http://localhost:8086/query?db={0}&q=delete%20from%20{1}%20"
                .format(db,ms) for ms in msList]
    return queryList


In [36]:
## Create Client to get initial data
client = createIfClient(HOST,PORT,USER,PW,DB)

In [22]:
## Query and return in Dataframe
measurements = getMeasurements(client)
cq = createSelectQueries(msList=measurements,tag="SERVICE_ID",db="grid1_ts",interval=1)
dq = createDeleteQueries(measurements,db="grid1_ts")

In [23]:
## REST API endpoints
headers={"accept": "application/json"}

In [42]:
## Repeat for Every Fileds
for url in cq:
    ## Request and Get reponse
    response = requests.get(url,headers=headers)
    if response.ok != True:
        print("Error at Query{0}".format(url))
    ## Load to Json
    js = json.loads(response.text)["results"][0]["series"]
    ## Create Empty Dataframe for this Field
    res = pd.DataFrame()
    ## Repeat for Every Services
    for result in js:
        ## Json to DF
        df = pd.DataFrame(data=result["values"],columns=result["columns"])
        ## Change Types into Float
        df[df.columns.difference(['time'])].astype(float)
        ## Change Str time to Timestamp
        df["time"]=df["time"].map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%dT%H:%M:%SZ'))
        ## Insert Tag Values 
        df["SERVICE_ID"]=result["tags"]["SERVICE_ID"]
        df["FIELD"] = result["name"]
        ## Append to df
        res = res.append(df)
    res.set_index("time")
    ## Convert to Parquet partition by Field and Service_ID
    parquet.write_to_dataset(pa.Table.from_pandas(res),root_path="../data/compacted.gzip",partition_cols=["FIELD","SERVICE_ID"])

In [43]:
tmp = parquet.read_table("../data/compacted.gzip")
table = tmp.to_pandas()

In [44]:
table

Unnamed: 0,time,mean_SERVICE_DC_CURRENT,mean_SERVICE_DC_VOLTAGE,mean_SERVICE_MAX_CELL_VOLTAGE,mean_SERVICE_MAX_MODULE_TEMPERATURE,mean_SERVICE_MIN_CELL_VOLTAGE,mean_SERVICE_MIN_MODULE_TEMPERATURE,mean_SERVICE_SOC,mean_SERVICE_SOH,FIELD,SERVICE_ID
0,2023-05-11 15:06:00,0.0,748.3,3.7,26.90,3.7,24.1,48.4,100.0,field1,1
1,2023-05-11 15:07:00,0.0,748.3,3.7,26.85,3.7,24.1,48.5,100.0,field1,1
2,2023-05-11 15:08:00,0.0,748.3,3.7,26.90,3.7,24.1,48.5,100.0,field1,1
3,2023-05-11 15:09:00,0.0,748.3,3.7,26.90,3.7,24.1,48.5,100.0,field1,1
4,2023-05-11 15:10:00,0.0,748.3,3.7,26.90,3.7,24.1,48.4,100.0,field1,1
...,...,...,...,...,...,...,...,...,...,...,...
2876,2023-06-02 23:56:00,432.1,890.4,,,,,,,field2,3
2877,2023-06-02 23:57:00,432.1,890.4,,,,,,,field2,3
2878,2023-06-02 23:58:00,432.1,890.4,,,,,,,field2,3
2879,2023-06-02 23:59:00,432.1,890.4,,,,,,,field2,3
