# Telemetry data fetching & processing

## Data Ingestion

In [None]:
#Load API credentials from .env file & initialize global variables 

import os
from dotenv import load_dotenv

# Load secret .env file
load_dotenv()

# Store credentials
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')

# Verify it worked
#if CLIENT_ID is not None and CLIENT_SECRET is not None:
    #print(CLIENT_ID)
    #print(CLIENT_SECRET)

#other global variables
THING_ID = "7db1852a-9709-471f-b049-7505253ceaad" #id of Arduino Thing of interest
SPACE_ID = '6b795d20-54e3-483c-a322-7114527741a1' #id of the shared space needed to access data

### Get data from Arduino IOT


In [11]:
import requests
import pandas as pd
import json
import time

#returns standard header to be included when making http request to API
#stardard header includes the authorization (access token) and x-organization (space id)
#to be used before each API call (tokens expire every 3 minutes i believe)
def getHeader():
      client = requests.request('POST', 'https://api2.arduino.cc/iot/v1/clients/token',
                        headers={'content-type': 'application/x-www-form-urlencoded'},
                        data={'grant_type':'client_credentials',
                              'client_id':CLIENT_ID,
                              'client_secret':CLIENT_SECRET,
                              'audience':'https://api2.arduino.cc/iot'})

      access_token = 'Bearer '+ client.json()['access_token']

      #standard header for the http requests
      header = {'authorization':access_token,'x-organization':SPACE_ID}
      
      return header

#returns dictionary of the properties of the thing 
#key: property name, value: property id
def getPropertyIDs() -> {str, str}:
      
      #TO DO: implement a better way of regenerating headers when needed
      header = getHeader()

      #TO DO: add try block around this ?
      url = f"https://api2.arduino.cc/iot/v2/things/{THING_ID}/properties/"
      req = requests.request('GET', url, headers=header)
      
      #these are not numerical and you get an error when trying to fetch their data
      exclude = ['canMessage', 'motorOn', 'gpsCoordinates']

      #build porperty disctionary with property id for all properties that will be queried
      props = {}
      for prop in req.json():
            name = prop['name']
            if name not in exclude:
                  id = prop["id"]
                  props[id]=name
      return props

#get the data for one specific property
#interval: data binning interval in seconds, smallest possible interval is 1s
#limit of 1000 data point returns
def getPropertySeries(thing_id, prop_id, from_t, interval):
      header = getHeader()
      url = f"https://api2.arduino.cc/iot/v2/things/{thing_id}/properties/{prop_id}/timeseries?desc=true&from={from_t}&interval={interval}"
      req = requests.request('GET', url, headers=header)
      return req.json()

#class to hold parameters used when making a request for parameter data from API
#purpose of using a class is to *elegantly* create dictionary representation of requests
#designed for use in getRawBatchQuery
class propertyRequest():
      #if no from date is set, 1000 starting from 0001-01-01T00:00:00Z will be returned 
      def __init__(self,property_id, from_t=None, to_t=None, sort=None, interval=None, series_limit=None):
            self.q = f"property.{property_id}"
            
            if from_t is None: pass
            else: self.from_t = from_t
            
            if to_t is None: pass
            else: self.to_t = to_t

            if sort is None: pass
            else: self.sort = sort
            
            if interval is None: pass
            else: self.interval = interval

            if series_limit is None: pass
            else: self.series_limit = series_limit
            

#return raw data points within time frame from_t to to_t
#for all properties in the list props
#returns at most 1000 data points for each property 
def getRawBatchQuery(props, from_t=None, to_t=None, sort='DESC'):
      header = getHeader()

      #batch_query_raw
      #madatory: q
      #optinal : from, to, sort, series_limit
      url = f"https://api2.arduino.cc/iot/v2/series/batch_query_raw"

      #build list of requests to be made 
      #i.e. get list containing parameter dict for each property 
      reqs = []
      for prop_id in props.keys():
            #dictionary with request parameters
            req = vars(propertyRequest(prop_id, from_t, to_t, sort))
            
            #rename time stamps keys to proper name for query format
            if req.get("from_t"): req["from"] = req.pop("from_t")
            if req.get("to_t"): req["to"] = req.pop("to_t")

            #add it to the list
            reqs.append(req)
      
      #get string of the JSON object representing the query
      query_str = json.dumps({"resp_version": 1,"requests": reqs})
      #convert to byte array because this is the format it must be passed as
      query_bytes = bytearray(query_str, "utf-8")

      #make request
      req = requests.request('POST', url, headers=header, data=query_bytes)
      return req.json()['responses']

#return data points at a given interval in the given time range
#time range is not optional
#this is not the raw data, averages are applied in order to get requested number of data points
#i.e. it returns values at evenly distributed time intervals in the time range given
#i think 300 data points is the max for a series
#this method is slower since it manipulates the values
def getBatchQuery(props, from_t=None, to_t=None, interval=None, series_limit=None):
      #batch_query_raw
      #madatory: q, from, to, series_limit or interval
      #optinal : sort?
      url = f"https://api2.arduino.cc/iot/v2/series/batch_query"
      header = getHeader() #get header with fresh token (tokensexpireevery 120? seconds)

      #build list of requests to be made 
      #i.e. get list containing parameter dict for each property 
      reqs = []
      for prop_id in props.values():
            #dictionary with request parameters
            req = vars(propertyRequest(prop_id, from_t, to_t, interval, series_limit))
            
            #rename time stamps keys to proper name for query format
            if req["from_t"]: req["from"] = req.pop("from_t")
            if req["to_t"]: req["to"] = req.pop("to_t")

            #add it to the list
            reqs.append(req)
      
      #get string of the JSON object representing the query
      query_str = json.dumps({"resp_version": 1,"requests": reqs})
      #convert to byte array because this is the format it must be passed as
      query_bytes = bytearray(query_str, "utf-8")

      #make request
      req = requests.request('POST', url, headers=header, data=query_bytes)
      return req.json()['responses']

#props = getPropertyIDs()
#res = getBatchQuery(props, from_t="2023-09-22T19:20:00.00Z", to_t="2023-09-22T20:00:00.00Z", interval=1)


### Merge returned data into dataframe

In [14]:


def dataToDF(data):
    """ Helper function for getData()
    Transform http request results  to dataframe of properties timeseries
    The timeseries data of all the properties is merged to a single dataframe

    Args:
        data (list of dict) : list of dicts corresponding to JSON formatted http 
            request results of each property
        from_time (str) : date in the format YYYY-MM-DDTHH:MM:SSZ (e.g. 2023-09-22T20:38:53Z),
    """

    telemetry_data = pd.DataFrame(columns=['time']) #main df to which all data will be merged
    
    stop_time = 0 #variable that tracks the smallest end time
   
   #add data for each proppety to the dataframe
    for prop_result in data:
        #query has format property.<id>
        prop_id = prop_result['query'].split(".",1)[1]
    
        #get property name 
        prop_name = props[prop_id]

        #get the property data
        #load next property data into a dataframe
        temp_df = pd.DataFrame({
            "time":prop_result['times'],
            prop_name : prop_result['values']})
        
        #if dataframe is empty, continue to the next property
        if len(temp_df.index) == 0:
            continue
    
        #convert 'time' column values to dateTime data type
        temp_df['time'] = pd.to_datetime(temp_df['time'], format="ISO8601") 
        #convert data column to numerica data type
        temp_df[prop_name] = pd.to_numeric(temp_df[prop_name])
        #rename the value column to the corresponding parameter
        temp_df = temp_df.rename(columns={"value": prop_name})
        stop_time = min(stop_time, temp_df.iat[-1])
        #merge it with the main df, merges on the time value in order and fills missing values with last value 
        try:
            telemetry_data = pd.merge_ordered(telemetry_data,temp_df,how='outer', fill_method="ffill")
        except ValueError:
            telemetry_data = telemetry_data.reindex(telemetry_data.columns.union(temp_df.columns), axis=1)
    return telemetry_data, stop_time


def getData(props, from_time, to_time) -> pd.DataFrame:
    """ Request the timeseries data for the given properties in the given time range and 
        return as ad ataframe

    Args:
        props (dict of str:str) : property id:property name, 
            collection of properties for which to request data
        from_time (str) : date in the format YYYY-MM-DDTHH:MM:SSZ (e.g. 2023-09-22T20:38:53Z),
            timeseries data start time
        from_time (str) : date in the format YYYY-MM-DDTHH:MM:SSZ (e.g. 2023-09-22T20:38:53Z),
            timeseries data start time
    """


    start_time = time.time()
    print("--- properties: %.2f seconds ---" % (time.time() - start_time))
    
    toTS = pd.to_datetime(to_time) #timestamp of to_time
    
    telemetry_df = pd.DataFrame()
   
    while True:
        #get data
        start_time = time.time()
        data = getRawBatchQuery(props, from_time, to_time)
        print("--- %.2f seconds ---" % (time.time() - start_time))
        
        temp_start = time.time()
        temp_df, stop_time = dataToDF(data)
        
        if len(temp_df.index) == 0:
            break

        print("--- %.2f seconds ---" % (time.time() - temp_start))
        
        temp_start = time.time()
        #merge with df
        telemetry_df = pd.concat([telemetry_df, temp_df])
        
        
        temp_start = time.time()
        #check if we got all the data, if yes break
        #if no, update from_time
        lastTS = temp_df['time'].iat[-1] + pd.Timedelta(1000, unit="ms")
        if lastTS >= toTS:
            break 
        print("--- %.2f seconds ---" % (time.time() - temp_start))
        
        from_time = lastTS.strftime('%Y-%m-%dT%H:%M:%SZ')
        #from_time = lastTS.isoformat()
        print("--- %.2f seconds ---" % (time.time() - temp_start))

        print("--- total time %.2f seconds ---" % (time.time() - start_time))
    return telemetry_df

from_time = "2023-11-27T17:38:53Z" #get data starting from this time, this would be the start time 
to_time = "2023-11-30T00:00:00.00Z"
props = getPropertyIDs()
df = getData(props, from_time, to_time) 



--- properties: 0.00 seconds ---
--- 13.34 seconds ---


### Apply transformations to data

In [None]:
#data transformation functions

#convert current from code to amp values
#based on calibrated function
#TO DO: get new function
def codeToAmps(x):
    return (0.01082 * x + -20.17682) * 2

#convert voltage from code to volt values
#based on calibrated function
#TO DO: get new function
def codeToVolts(x):
    return 0.02048 * x + -0.53823

#convert meter value to kilometers
def metersToKM(meters):
    return meters / 1000

In [None]:
# apply data conversions

#TO DO: complete this
telemetry_table['speed'].apply(metersToKM)
telemetry_table['batteryVoltage'].apply(codeToVolts)
telemetry_table['battery'].apply(codeToAmps)

In [None]:
#TO DO: make function which appends newly fetched data to existing data
#TO DO: add graphing functions