Imports and setup the connection

In [1]:
import pandas as pd
import influxdb
from datetime import timezone, datetime
import pytz
import matplotlib.pyplot as plt
import certifi

%matplotlib inline
plt.rcParams['figure.figsize'] = [15, 5]


client = influxdb.InfluxDBClient(host='206.12.92.81',port=8086, 
                                  username='public', password='public',database='SKYSPARK')

#Hopefully this is 5.2.3, will likely run into known bug with chunking if it is 5.3.0
print(influxdb.__version__) 


5.2.3


## How to locate valid and invalid records:

Data in the SkySpark database has a unique identifying field called `id`. An entry might look like:

@p:ubcv:r:236c8a88-c9742760 Pharmacy Heating Plant HX-2 P-HX2A HX2_PHX2A_VFD_PWR(kWh)

The key thing here is the `@p:....` portion of the tag. This has not been stored in the influxDB.

The influx DB uses `groupRef`, `equipRef`, `typeRef`, `navName`, `siteRef` and `unit`\* to try to uniquely identify a sensor and link it back to SkySpark. This results in issues when one of those values is updated in skySpark (i.e. `equipRef` has 'Pharmacy' added to it). 

\*`unit` isn't necessarily useful in uniquely identifying a tag, at least in the SkySpark database for Pharmacy there are no instances where `groupRef`, `equipRef`, `typeRef`, `navName` and `siteRef` are identical but `unit` differs.


The end result is that there are many series in influxDB that can not be linked back to SkySpark so they will need to be deleted from influxDB and requeried for the update to the influxdb structure. Even those that can be linked may only have valid data starting at a specific date so older data will need to be queried and added.

1. Records in SkySpark that can not be uniquely identified by the six fields listed above (there are 185 of these)   
    - These are saved in Pharmacy-Conflicts-In-Metadata.csv   
2. Records in SkySpark that can not be found in influxDB (there are 6006 of these)   
    - These are combined with the records from part1 and saved in Pharmacy-Records-To-Requery.csv   
3. Records in SkySpark that **CAN** be found in influxDB but only after a certain date (these don't need to be deleted from influxDB, only queried from database startDate until the specific first date for which points for that sensor already exist)
    - These are saved as Pharmacy-Records-To-Keep.csv (but remember, observations from before the 'firstdate' value for these records will still need to be queried and added to influxDB!   


Note that all of the `equipRef` tags in the SkySpark data have 'Pharmacy' at the start (after the `@p:...` bit that isn't included in influxDB). Only SOME of the data in influxDB has an `equipRef` that starts with 'Pharmacy'. The code takes care of adding this to create the csvs but it is something you should consider doing to the data in influxDB to try to get like sensors grouped together 

(i.e. a sensor may have 400 points with an equipRef that starts with 'Pharmacy' and then another 6000 points with an equipRef that doesn't start with Pharmacy. It is the same sensor. So it makes sense to write a script to go in and delete+update points to get them all together in one series with and equipRef that starts with 'Pharmacy)

### 1) Records in SkySpark that can not uniquely identified by tags available in influxDB

These get saved into their own csv in case it is easier to access them that way but the data is also concatenated onto the final dataframe of SkySpark records that need to be requeried.

In [2]:
#Load export of "metadata" from skyspark for Pharmacy building (performed morning of 2020-06-05)
#This could be a query to SkySpark or to the new metadata_points measurement in influxDB
#   if you want to work through all the buildings

metadata = pd.read_csv("skyspark-pharmacy-2020-06-05-query.csv")

#Update some tags to remove the @id from them to make them play nice with the data stored in the influxDB
metadata['equipRefclean']=metadata['equipRef'].str.extract('[^ ]* (.*)', expand=True)
metadata['groupRefclean']=metadata['groupRef'].str.extract('[^ ]* (.*)', expand=True)
metadata['siteRefclean']=metadata['siteRef'].str.extract('[^ ]* (.*)', expand=True)

#Create a uniqueID by concatenating the equipRef, groupRef, navName, siteRef, bmsName (aka typeRef), and unit
metadata['uniqueId']=metadata['equipRefclean'].fillna('')+' '+metadata['groupRefclean'].fillna('')+' '+metadata['navName'].fillna('')+ \
' '+metadata['siteRefclean'].fillna('')+' '+metadata['bmsName'].fillna('')+metadata['unit'].fillna('')

#Also create a uniqueId without a unit - this ends up being more useful for joining
metadata['uniqueId_nounit']=metadata['equipRefclean'].fillna('')+' '+metadata['groupRefclean'].fillna('')+' '+metadata['navName'].fillna('')+ \
' '+metadata['siteRefclean'].fillna('')+' '+metadata['bmsName'].fillna('')+metadata['unit'].fillna('')

#Confirm that the unique ID in each data frame is actually unique!
print("There are",len(metadata),"sensors in the Pharmacy building's skyspark metadata")

print(sum(metadata.duplicated(subset=["uniqueId_nounit"], keep=False)), "conflicting entries in the dataframe of skyspark data (NOT including unit as part of unique 'key'.")
print(sum(metadata.duplicated(subset=["uniqueId"], keep=False)), "conflicting entries in the dataframe of skyspark data (including unit as part of unique 'key'). Saving as 'Pharmacy-Conflicts-In-Metadata.csv'")


(metadata[metadata.duplicated(subset=["uniqueId"], keep=False)]).to_csv("Pharmacy-Conflicts-In-Metadata.csv")


#We'll use this dataframe later for joining in influx data
metadata_no_conflicts = metadata.drop_duplicates(subset=['uniqueId_nounit'], keep=False)


  interactivity=interactivity, compiler=compiler, result=result)


There are 7938 sensors in the Pharmacy building's skyspark metadata
185 conflicting entries in the dataframe of skyspark data (NOT including unit as part of unique 'key'.
185 conflicting entries in the dataframe of skyspark data (including unit as part of unique 'key'). Saving as 'Pharmacy-Conflicts-In-Metadata.csv'


### 2) Records in SkySpark that can not be found in influxDB (there are ~6000 of them)

and

### 3) Records in SkySpark that **CAN** be found in influxDB but only after a certain date

This is a much lengthier process then part 1. First need to query the influxDB, add a uniqueId field, and do some joining with the metadata. In the process, we'll be answering part 3.

End result will be a csv with all the SkySpark records that need to be requeried over the entire date range (including those from part 1) and a csv with all the SkySpark records that need to be requeried up until (and including) the specific date listed for that records.


In [3]:
#Get a list of all buildings stored in the siteRef tag
#This code is not used but I'm leaving it here in case you want to implement it to loop through
#All buildings instead of just Pharmacy

results = client.query('SHOW TAG VALUES ON "SKYSPARK" WITH KEY = "siteRef";')
site_list=[]
for item in results["UBC_EWS"]:
    site_list.append(item["value"])
print(len(site_list),"buildings found in the siteRef tag")    

128 buildings found in the siteRef tag


#### Query influxDB to get the time stamp of the last value stored for each sensor

(assuming that a combination of `groupRef`, `equipRef`, `typeRef`, `navName`, and `unit` uniquely identify a sensor - which as seen from part 1a is not true. This is the best we can do with the data though)

In [4]:
# Query the database for all of the first time stamps for each sensor 
# (have to do first(value) to make influxDB happy even though I don't care about the value)

#JUST LOOKING AT A SINGLE BUILDING so setting site="Pharmacy"
#But you can imagine we could do a for loop here and encapsulate all the following cells of code in it
#something like: `for site in site_list:`
site="Pharmacy"

query = '''select first(value), groupRef, equipRef, typeRef, navName, unit from UBC_EWS 
where siteRef=$siteRef group By groupRef, equipRef,typeRef,navName,unit'''

where_params = {'siteRef': site}
result = client.query(query = query, bind_params=where_params, chunked=True, chunk_size=10000)

In [5]:
#Turn results of query into a dataframe
#(I wasn't able to make the influxdb-python DataFrameClient object work for a query with grouping!)
list_of_df=[]
n=0
for item in result.get_points(measurement="UBC_EWS"):
    list_of_df.append(pd.DataFrame.from_dict(item,orient="index"))

rawdf = pd.concat(list_of_df, axis=1)
rawdf = rawdf.transpose(copy=True)
rawdf.reset_index(drop=True, inplace=True)

list_of_df=None

In [6]:
#Get a date column from the timestamp 'time' column
rawdf['time'] = pd.to_datetime(rawdf['time'])
rawdf['firstdate'] = rawdf['time'].dt.date

In [7]:
print("Number of supposedly unique Pharmacy sensors in influxDB",len(rawdf))

#This is kinda like a reset point so if things get messed up in the process below


Number of supposedly unique Pharmacy sensors in influxDB 17584


#### Next we will create a uniqueId in the influx data to be able to join it with the metadata. 

The tricky bit is that in influx, many of the records don't have equipRef tags that start with "Pharmacy" while basically every equipRef tag in the SkySpark data starts with "Pharmacy"...

In [8]:
#equipRef is an issue: In influxDB some records start with 'Pharmacy' (just like in SkySpark)
# however the majority do not. This needs to be fixed to be able to link these records back to 

print(metadata_no_conflicts['equipRefclean'].str.startswith("Pharmacy ").sum(),"/",len(metadata_no_conflicts), "SkySpark records have equipRef that starts with Pharmacy")
print(rawdf['equipRef'].str.startswith("Pharmacy ").sum(),"/",len(rawdf), "influx records have equipRef that starts with Pharmacy")


7752 / 7753 SkySpark records have equipRef that starts with Pharmacy
4303 / 17584 influx records have equipRef that starts with Pharmacy


In [9]:
#Create uniqueID by concatenating specific fields in the data queried from influxDB

#Make copy so that we don't have to run a bunch of cells every time we want to make a change to the code below
influxdf = rawdf.copy() 

influxdf['equipRef_fix'] = influxdf['equipRef'].apply(lambda x: x if (x.startswith(site)) else site+" "+x)
print(influxdf['equipRef_fix'].str.startswith("Pharmacy ").sum(),"/",len(influxdf), "influx records now have equipRef_fix value that starts with Pharmacy")

influxdf['uniqueId_nounit']=influxdf['equipRef_fix'].fillna('')+' '+influxdf['groupRef'].fillna('')+' '+influxdf['navName'].fillna('')+ \
' '+site+' '+influxdf['typeRef'].fillna('')


17584 / 17584 influx records now have equipRef_fix value that starts with Pharmacy


Now that we've corrected all the sensors that didn't have 'Pharmacy' at the start of the equipRef, we need to basically regroup everything into series and end up with only the First Time stamp for each series

In [10]:
influxdf_nodup = influxdf.sort_values(by='time', ascending=True).drop_duplicates(subset='uniqueId_nounit', keep="first")
print("Influx series (sensors) before dropping newer instances of series:", len(influxdf))
print("Influx series (sensors) after retaining only the earliest instance of a series:", len(influxdf_nodup))

Influx series (sensors) before dropping newer instances of series: 17584
Influx series (sensors) after retaining only the earliest instance of a series: 8660


#### Finally can join the influx data with the SkySpark data:

In [11]:

joined_df = pd.merge(left=metadata_no_conflicts, right=influxdf_nodup, how='inner', on=['uniqueId_nounit'], suffixes=('_sspark','_influx') )
print("Number of SkySpark records after dropping conflicts:",len(metadata_no_conflicts))
print("Number of unique influx series:",len(influxdf_nodup))
print("Number of influx records that can be linked back to skyspark records:",len(joined_df))

Number of SkySpark records after dropping conflicts: 7753
Number of unique influx series: 8660
Number of influx records that can be linked back to skyspark records: 1747


#### Now create some csvs of the results

In [12]:
#Select only the useful fields for identifying the records
joined_df=joined_df[['uniqueId_nounit','id','firstdate','time','groupRef_influx',
                     'equipRef_fix','equipRef_influx','navName_influx','siteRef','typeRef','unit_sspark','unit_influx','kind']]

#Save the records in a .csv

print("Influx data that only needs to be requeried before 'firstdate' is saved as 'Pharmacy-Records-To-Keep.csv'")

joined_df.to_csv("Pharmacy-Records-To-Keep.csv")

#Also make a csv of all the records that need to be re-queried in their entirety. 
# Using in indicator argument allows us to do a join and then only keep the records that only belong
# to the left dataframe. Super handy!

nokeep = pd.merge(left=metadata_no_conflicts, right=influxdf_nodup, on=['uniqueId_nounit'], how='left',indicator=True)
nokeep = nokeep[nokeep['_merge'] == 'left_only'][['id','groupRefclean','equipRefclean','siteRef','navName_x','bmsName','unit_x','kind']]


#Add the problematic records back in - the ones that can't be uniquely indexed by the main identifying fields used in influx
conflicting_records = metadata[metadata.duplicated(subset=["uniqueId_nounit"], keep=False)]['id']
print("Number of SkySpark records with no linkable instance in influx data (not including the 185 conflicts in SkySpark):",len(nokeep))
print("Number of SkySpark records with no linkable instance in influx data",len(nokeep)+len(conflicting_records))
print("Saving as 'Pharmacy-Records-To-Requery.csv'")
pd.concat([nokeep,conflicting_records], axis=0).reset_index().to_csv("Pharmacy-Records-To-Requery.csv")

Influx data that only needs to be requeried before 'firstdate' is saved as 'Pharmacy-Records-To-Keep.csv'
Number of SkySpark records with no linkable instance in influx data (not including the 185 conflicts in SkySpark): 6006
Number of SkySpark records with no linkable instance in influx data 6191
Saving as 'Pharmacy-Records-To-Requery.csv'


  result = result.union(other)
