# Code Backend

In [1]:
import quantaq
import plotly.express as px
import pandas as pd
from quantaq.utils import to_dataframe
from tqdm.notebook import tqdm
from scipy.stats import variation
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import numpy as np
import random
from scipy.stats import probplot
import plotly.graph_objects as go
from sklearn.neighbors import NearestNeighbors

In [None]:
def load_data(batch, start, end, resample_length="1min", na_limit=0.15):
    """
    `load_data` will download data from the testing chamber, report and drop units with issues connecting, and return a DataFrame. 

    :param batch: The name of the batch to pass to the API for downloading.
    :type batch: string
    :param start: The start time of the data of interest, formatted like: "2021-05-30 09:00:00".
    :type start: string
    :param end: The end time of the data of interest, formatted like: "2021-05-30 09:00:00".
    :type end: string
    :param resample_length: Interval to be used in resampling, defaults to "1min".
    :type resample_length: ???
    :param na_limit: ???
    :type na_limit: float
    :return: ???.
    :rtype: ???
    """
    sensor_cols = ['met.pressure', 'met.rh', 'met.temp', 'neph.bin0', 'neph.bin1', 'neph.bin2', 'neph.bin3', 'neph.bin4', 'neph.bin5', 'neph.pm1', 'neph.pm10', 'neph.pm25', 'opc.bin0', 'opc.bin1', 'opc.bin10', 'opc.bin11', 'opc.bin12', 'opc.bin13', 'opc.bin14', 'opc.bin15', 'opc.bin16', 'opc.bin17', 'opc.bin18', 'opc.bin19', 'opc.bin2', 'opc.bin20', 'opc.bin21', 'opc.bin22', 'opc.bin23', 'opc.bin3', 'opc.bin4', 'opc.bin5', 'opc.bin6', 'opc.bin7', 'opc.bin8', 'opc.bin9', 'opc.pm1', 'opc.pm10', 'opc.pm25', 'opc.rh', 'opc.temp']
    nonsensor_cols = ['flag', 'sn', 'timestamp', 'timestamp_local', 'url', 'geo.lat', 'geo.lon']   
    
    if steady_start==None:
        steady_start=start
    if steady_end==None:
        steady_end=end
    
    # Setup the API Client
    client = quantaq.QuantAQAPIClient()
    
    # Retrieve the devices
    devices = client.devices.list(team="Batch 3.1")
    
    frames = []
    unit_len = {}

    with tqdm(total=len(devices), desc="API Download") as pbar:
        for each in devices:
            if (np.datetime64(each['last_seen']) > np.datetime64(start)) & (np.datetime64(each['created']) < np.datetime64(end)):
                data = client.data.list(sn=each["sn"], start=start, stop=end, raw=True, per_page=500)
                frame = to_dataframe(data)
                unit_len[each["sn"]] = len(frame)
                if len(frame)>0:
                    if frame.drop(nonsensor_cols, axis=1).isna().sum().sum() == 0:
                        # Resample?
                        frame = frame.resample(resample_length, on='timestamp').mean().reset_index()
                        frame["sn"] = each["sn"]
                        # Append
                        frames.append(frame)
                    else:
                        na_frame = frame.drop(nonsensor_cols, axis=1).isna()
                        print(f"WARNING: Unit {each['sn']} returning unexpected missing values for {list(na_frame.columns[na_frame.sum()>0])}")
                else:
                    print(f"WARNING: Unit {each['sn']} not recording in timeframe")
            else:
                print(f"WARNING: Unit {each['sn']} not connecting in timeframe")
            pbar.update(1)

    frames = pd.concat(frames)

    # Set the datatype for the flag
    frames["flag"] = frames["flag"].astype("int8", errors='ignore')

    # Drop empty columns
    frames = frames.dropna(how='all', axis=1)
    
    # Pick out frames that give a lot of NaNs
#     frames_nnas = frames[["sn","opc.bin0"]].groupby("sn").apply(lambda x: (~x.isna()).sum())["opc.bin0"]
#     highna_sn = list(frames_nnas.index[frames_nnas < (n_samples * (1 - na_limit))])
    for key in unit_len: unit_len[key] = 100*unit_len[key]/max(unit_len.values())
    low_data_sns = {key:val for (key,val) in unit_len.items() if val<85}
    if len(low_data_sns) > 0:
        for sn,missing_data_pct in low_data_sns.items():
            print(f"WARNING: Unit {sn} only reports {str(missing_data_pct)[:4]}% of data")
        frames = frames[~frames["sn"].isin(list(low_data_sns.keys()))]
    
    # all non- or low-reporting units have been filtered out at the point
    sns_list = list(frames["sn"].unique())
    print(f"The number of recording units is {len(sns_list)}")
    
    # Set timestamp as index
    frames.set_index(frames["timestamp"], inplace=True)
    
    return frames


def find window(df, var):
    var_trend = df[["timestamp", var]].groupby("timestamp").quantile(q=[0.25, 0.5, 0.75])
    fig = px.line(var_trend, x="timestamp", y=var, title= f"Median {var} over time", render_mode="webgl")
    
    return fig

* TODO: produce Axis with trend and problem units
* TODO: elevate ~5 best units

In [None]:
def flag_units(df, var, steady_start=None, steady_end=None, threshold_factor=2):
    
    # build a multiindexed df with var as column, sn and timestamp as index, and restricted to the time between steady_start and steady_end
    df_steady = df[[var,"sn", "timestamp"]].copy()
    df_steady["timestamp"] = pd.to_datetime(df_steady["timestamp"])
    df_steady.set_index(["sn","timestamp"], inplace=True)
    if not (steady_start==None or steady_end==None):
        df_steady = df_steady.loc[(slice(None),slice(steady_start,steady_end)),:]
    
    # compute the scaling factors (neph_scale) needed so that each module will have the same mean over time (as in neph_scaled)
    var_scale = df_steady.groupby(level=0).mean()
    var_scale = var_scale.median()/var_scale
    var_scaled = df_steady*var_scale
    
    var_scaled_trend = var_scaled.groupby(level=1).median()
    var_std_list = (var_scaled.divide(var_scaled_trend, axis=0)).groupby(level=0).std()
    var_std_threshold = var_std_list.median()[0]*threshold_factor
    
    problem_units = var_std_list[var_std_list[var] > var_std_threshold]
    if len(problem_units)>0:
        print(f"Median standard deviation is {var_std_list.median()[0]}")
        print(f"Modules which are inconsistent in {var}:")
        print(problem_units)
        
        return list(problem_units.index), var_std_list
    else:
        print(f"No issues found with {var}.")
        return [], var_std_list

* TODO: Facilitate easy further data exploration

In [1]:
def qaqc_command(batch, start, end, steady_start=None, steady_end=None, resample_length="1min", na_limit=0.15):
    
    sensor_cols = ['met.pressure', 'met.rh', 'met.temp', 'neph.bin0', 'neph.bin1', 'neph.bin2', 'neph.bin3', 'neph.bin4', 'neph.bin5', 'neph.pm1', 'neph.pm10', 'neph.pm25', 'opc.bin0', 'opc.bin1', 'opc.bin10', 'opc.bin11', 'opc.bin12', 'opc.bin13', 'opc.bin14', 'opc.bin15', 'opc.bin16', 'opc.bin17', 'opc.bin18', 'opc.bin19', 'opc.bin2', 'opc.bin20', 'opc.bin21', 'opc.bin22', 'opc.bin23', 'opc.bin3', 'opc.bin4', 'opc.bin5', 'opc.bin6', 'opc.bin7', 'opc.bin8', 'opc.bin9', 'opc.pm1', 'opc.pm10', 'opc.pm25', 'opc.rh', 'opc.temp']
    opc_cols = ['opc.bin0', 'opc.bin1', 'opc.bin2', 'opc.bin3', 'opc.bin4', 'opc.bin5', 'opc.bin6', 'opc.bin7', 'opc.bin8', 'opc.bin9']
    nonsensor_cols = ['flag', 'sn', 'timestamp', 'timestamp_local', 'url', 'geo.lat', 'geo.lon']   
    
    if steady_start==None:
        steady_start=start
    if steady_end==None:
        steady_end=end
    
    # Setup the API Client
    client = quantaq.QuantAQAPIClient()
    
    # Retrieve the devices
    devices = client.devices.list(team="Batch 3.1")
    
    frames = []
    unit_len = {}

    with tqdm(total=len(devices), desc="API Download") as pbar:
        for each in devices:
            if (np.datetime64(each['last_seen']) > np.datetime64(start)) & (np.datetime64(each['created']) < np.datetime64(end)):
                data = client.data.list(sn=each["sn"], start=start, stop=end, raw=True, per_page=500)
                frame = to_dataframe(data)
                unit_len[each["sn"]] = len(frame)
                if len(frame)>0:
                    if frame.drop(nonsensor_cols, axis=1).isna().sum().sum() == 0:
                        # Resample?
                        frame = frame.resample(resample_length, on='timestamp').mean().reset_index()
                        frame["sn"] = each["sn"]
                        # Append
                        frames.append(frame)
                    else:
                        na_frame = frame.drop(nonsensor_cols, axis=1).isna()
                        print(f"WARNING: Unit {each['sn']} returning unexpected missing values for {list(na_frame.columns[na_frame.sum()>0])}")
                else:
                    print(f"WARNING: Unit {each['sn']} not recording in timeframe")
            else:
                print(f"WARNING: Unit {each['sn']} not connecting in timeframe")
            pbar.update(1)

    frames = pd.concat(frames)

    # Set the datatype for the flag
    frames["flag"] = frames["flag"].astype("int8", errors='ignore')

    # Drop empty columns
    frames = frames.dropna(how='all', axis=1)
    
    # Pick out frames that give a lot of NaNs
#     frames_nnas = frames[["sn","opc.bin0"]].groupby("sn").apply(lambda x: (~x.isna()).sum())["opc.bin0"]
#     highna_sn = list(frames_nnas.index[frames_nnas < (n_samples * (1 - na_limit))])
    for key in unit_len: unit_len[key] = 100*unit_len[key]/max(unit_len.values())
    low_data_sns = {key:val for (key,val) in unit_len.items() if val<85}
    if len(low_data_sns) > 0:
        for sn,missing_data_pct in low_data_sns.items():
            print(f"WARNING: Unit {sn} only reports {str(missing_data_pct)[:4]}% of data")
        frames = frames[~frames["sn"].isin(list(low_data_sns.keys()))]
    
    # all non- or low-reporting units have been filtered out at the point
    sns_list = list(frames["sn"].unique())
    print(f"The number of recording units is {len(sns_list)}")
    
    # Set timestamp as index
    frames.set_index(frames["timestamp"], inplace=True)
    
    # construct multindex df with opc data for the steady state
    opc_steady = frames[opc_cols + ['sn', "timestamp"]].copy()
    opc_steady["timestamp"] = pd.to_datetime(opc_steady["timestamp"])
    opc_steady = opc_steady.set_index(["sn", "timestamp"])
    opc_steady = opc_steady.loc[(slice(None),slice(steady_start,steady_end)),:]

    opc_scale = opc_steady.groupby(level=0).mean().sum(axis=1)
    opc_scale = opc_scale.mean()/opc_scale
    
    print("opc_scale.head")
    print(opc_scale.head())
    
    # build a multiindexed df, neph_steady, with neph.bin0 as column, sn and timestamp as index, and restricted to the time between steady_start and steady_end
    neph_steady = frames[["neph.bin0","sn", "timestamp"]].copy()
    neph_steady["timestamp"] = pd.to_datetime(neph_steady["timestamp"])
    neph_steady.set_index(["sn","timestamp"], inplace=True)
    neph_steady = neph_steady.loc[(slice(None),slice(steady_start,steady_end)),:]
    
    # compute the scaling factors (neph_scale) needed so that each module will have the same mean over time (as in neph_scaled)
    neph_scale = neph_steady.groupby(level=0).mean()
    neph_scale = neph_scale.median()/neph_scale
    neph_scaled = neph_steady*neph_scale
    
    # compute the square of the rolling standard deviation and build a Series with each module's average std
    neph_rstd_list = ((neph_scaled
                       .reset_index(["sn"])
                       .groupby("sn", as_index=False)
                       .rolling(pd.to_timedelta("5min"), win_type='gaussian')
                       .std()
                       **2
                      )
                      .groupby(level=0)
                      .mean()
                     )
    neph_rstd_threshold = neph_rstd_list.median()[0]*2
    print(f"rstd threshold: {neph_rstd_threshold}")
    print("Modules with a lot of noise in neph.bin0:")
    print(neph_rstd_list[neph_rstd_list["neph.bin0"] > neph_rstd_threshold])
    
    neph_scaled_trend = neph_scaled.groupby(level=1).median()
    neph_std_list = (neph_scaled.divide(neph_scaled_trend, axis=0)).groupby(level=0).std()
    neph_std_threshold = neph_std_list.median()[0]*2
    print("Modules which are inconsistent in neph.bin0:")
    print(neph_std_list[neph_std_list["neph.bin0"] > neph_std_threshold])
    
    return None

# Workflow

* TODO: Flag data that seems to not be in the chamber?

In [None]:
df = load_data(batch="Batch 3.1", start="2021-05-30 09:00:00", end="2021-05-31 10:00:00")

find_window(df,"neph.bin0").show()

Using the above plot, you may define an appropriate steady state window below.  Rerun for different variables as necessary.

In [None]:
steady_start =
steady_end = 

In [2]:
qaqc_command(batch="Batch 3.1", start="2021-05-30 09:00:00", end="2021-05-31 10:00:00")

API Download:   0%|          | 0/58 [00:00<?, ?it/s]

The number of recording units is 31
opc_scale.head
sn
MOD-PM-00053     0.902558
MOD-PM-00066     0.746094
MOD-PM-00075    57.467692
MOD-PM-00081    19.128658
MOD-PM-00082    17.686309
dtype: float64
rstd threshold: 536679.0445348667
Modules with a lot of noise in neph.bin0:
                 neph.bin0
sn                        
MOD-PM-00075  1.122859e+08
MOD-PM-00081  2.906104e+07
MOD-PM-00082  2.375387e+07
MOD-PM-00083  1.261647e+08
MOD-PM-00085  6.456529e+07
MOD-PM-00227  1.301058e+06
MOD-PM-00235  1.697018e+06
MOD-PM-00244  1.300832e+06
MOD-PM-00246  1.862785e+07
MOD-PM-00249  1.413026e+07
Modules which are inconsistent in neph.bin0:
              neph.bin0
sn                     
MOD-PM-00075   0.775496
MOD-PM-00081   0.471507
MOD-PM-00082   0.445035
MOD-PM-00083   0.757659
MOD-PM-00085   0.447959
MOD-PM-00227   0.214566
MOD-PM-00246   0.169934
MOD-PM-00249   0.116688


In [4]:
qaqc_command(batch="Batch 3.1", start="2021-05-08 00:00:00", end="2021-05-08 21:00:00", steady_start='2021-05-08 09:00:00', steady_end='2021-05-08 18:00:00',)

API Download:   0%|          | 0/50 [00:00<?, ?it/s]

The number of recording units is 19
sn
MOD-PM-00201    0.902044
MOD-PM-00202    1.183309
MOD-PM-00203    1.000295
MOD-PM-00205    1.162088
MOD-PM-00207    0.933364
dtype: float64
