# Database API example for N-CMAPSS dataset
- loads the datasets you choose,
- creates a dataframe
- resamples
- saves
#### Do this here, then use another notebook for individual tasks
### imports

In [1]:
import os
import sys
import pickle
import h5py
import time
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats as stats
import math
import random
import gc
import string
import multiprocessing as mp
import tensorflow as tf
import json
from time import time
base_dir = os.path.dirname(os.getcwd())

sys.path.insert(1, base_dir)
from package.api import DB as api
import package.utils as utils

%matplotlib inline
%load_ext autoreload
%autoreload 2

## load all data

In [69]:
h5_dir = 'data_h5'
fnames = [
    'N-CMAPSS_DS01-005.h5',
    'N-CMAPSS_DS03-012.h5',
    'N-CMAPSS_DS04.h5',
    'N-CMAPSS_DS05.h5',
    'N-CMAPSS_DS06.h5',
    'N-CMAPSS_DS07.h5',
    'N-CMAPSS_DS08a-009.h5',
    'N-CMAPSS_DS08c-008.h5'
]

sets = ['dev', 'test']

df = pd.DataFrame()
asset_id = 1

for filename in fnames:
    print(filename)
    for _set in sets:
        print(_set)
        with h5py.File(os.path.join(base_dir, h5_dir, filename), 'r') as hdf:
            a_data = np.array(hdf.get(f"A_{_set}"))
            w_data = np.array(hdf.get(f"W_{_set}"))
            x_data = np.array(hdf.get(f"X_s_{_set}"))
            v_data = np.array(hdf.get(f"X_v_{_set}"))
            t_data = np.array(hdf.get(f"T_{_set}"))
            y_data = np.array(hdf.get(f"Y_{_set}"))

            a_labels = [l.decode('utf-8') for l in list(np.array(hdf.get('A_var')))]
            w_labels = [l.decode('utf-8') for l in list(np.array(hdf.get('W_var')))]
            x_labels = [l.decode('utf-8') for l in list(np.array(hdf.get('X_s_var')))]
            v_labels = [l.decode('utf-8') for l in list(np.array(hdf.get('X_v_var')))]
            t_labels = [l.decode('utf-8') for l in list(np.array(hdf.get('T_var')))]
            
        df_a = pd.DataFrame(data=a_data, columns=a_labels)
        df_a['asset_id'] = -1
        df_a['dataset'] = filename.split('_')[1].split('.')[0]
        df_w = pd.DataFrame(data=w_data, columns=w_labels)
        df_x = pd.DataFrame(data=x_data, columns=x_labels)
        df_v = pd.DataFrame(data=v_data, columns=v_labels)
        df_t = pd.DataFrame(data=t_data, columns=t_labels)
        df_y = pd.DataFrame(data=y_data, columns=['y'])
        print(f"<{filename}> : {pd.unique(df_a.unit)}")
        for n in list(pd.unique(df_a.unit)):
            df_a.loc[df_a['unit'] == n, 'asset_id'] = asset_id
            asset_id = asset_id + 1

        df_temp = pd.concat([df_a, df_y, df_w, df_x, df_v, df_t], axis=1)
        #print(df_temp.head())
        if(len(df)) == 0:
            df = df_temp
        else:
            df = pd.concat([df, df_temp], axis=0)      
        
        del df_a, df_w, df_x, df_v, df_t, df_y, a_data, w_data, t_data, x_data, y_data, df_temp
    break
    ####### NOTICE THE BREAK HERE!! only loading first dataset for testing purposes!!
df = df.round(5)
df.asset_id = df.asset_id.astype(int)
df.unit = df.unit.astype(int)
df.cycle = df.cycle.astype(int)
df.hs = df.hs.astype(int)
df.Fc = df.Fc.astype(int)

N-CMAPSS_DS01-005.h5
dev
<N-CMAPSS_DS01-005.h5> : [1. 2. 3. 4. 5. 6.]
test
<N-CMAPSS_DS01-005.h5> : [ 7.  8.  9. 10.]


## get the labels

In [3]:
y_labels = t_labels
t_labels = []
t_labels.append(w_labels)
t_labels.append(x_labels)
t_labels = [l for labels in t_labels for l in labels]
print(y_labels)
print(t_labels)
print(v_labels)

['fan_eff_mod', 'fan_flow_mod', 'LPC_eff_mod', 'LPC_flow_mod', 'HPC_eff_mod', 'HPC_flow_mod', 'HPT_eff_mod', 'HPT_flow_mod', 'LPT_eff_mod', 'LPT_flow_mod']
['alt', 'Mach', 'TRA', 'T2', 'T24', 'T30', 'T48', 'T50', 'P15', 'P2', 'P21', 'P24', 'Ps30', 'P40', 'P50', 'Nf', 'Nc', 'Wf']
['T40', 'P30', 'P45', 'W21', 'W22', 'W25', 'W31', 'W32', 'W48', 'W50', 'SmFan', 'SmLPC', 'SmHPC', 'phi']


## create the augmented auxiliary data by aggregating over units
#### NOTE the "asset_id" is used to assign unique numbers to the units across all datasets since the unit numbers restart in each dataset
#### NOTE this value does not reflect the assets true ID stored in the database if there are already units in the database (but this impacts nothing, just fyi)

In [4]:
df_aux = df[['asset_id', 'Fc', 'unit', 'dataset', 'cycle']].groupby('asset_id').agg({'Fc':'max',
                                                                         'unit':'max',
                                                                         'dataset':'max', 
                                                                         'cycle':['min','max']})
df_aux.reset_index(inplace=True)
df_aux.columns=['asset_id', 'group_id', 'unit', 'dataset', 'age', 'eol']
df_aux.age = df_aux.age - 1.0
df_aux.head()

Unnamed: 0,asset_id,group_id,unit,dataset,age,eol
0,1,1,1,DS01-005,0.0,100
1,2,3,2,DS01-005,0.0,75
2,3,2,3,DS01-005,0.0,100
3,4,1,4,DS01-005,0.0,95
4,5,3,5,DS01-005,0.0,89


## connect to db

In [5]:
# THESE ARE YOUR CREDENTIALS IN PLAIN TEXT!
params = utils.get_aws_secret("/secret/ncmapssdb")
#print(params)
db, cur =  api.connect(params)
db.set_session(autocommit=True)
del(params)

[INFO] connecting to db.
[INFO] connected.


## create asset type
### NOTE if the asset type already exists, the function simply returns it
### BIG NOTE the asset_type and subtype must combine to form the table name of the component, so the table would be engine_ncmapss_tb

In [6]:
asset_type = api._create_asset_type(asset_type='engine', subtype='ncmapss', description='turbine engine from N-CMAPSS dataset unit', db=db, cur=cur)
print(asset_type)

[INFO] asset_type already exists.
   id    type  subtype                                description
0   1  engine  ncmapss  turbine engine from N-CMAPSS dataset unit


### make some serial numbers

In [None]:
serial_numbers = [utils.generate_serial_number(length=8) for _ in range(len(df_aux))]

In [None]:
# if not os.path.exists('serial_numbers.txt'):
#     serial_numbers = [utils.generate_serial_number(length=8) for _ in range(len(df_aux))]
#     with open(csv_dir + 'serial_numbers.txt', "w") as f:
#         for sn in serial_numbers:
#             f.write(f"{sn}\n")
# else:
#     serial_numbers = []
#     with open("serial_numbers.txt", "r") as f:
#         for sn in f:
#             serial_numbers.append(sn.strip())

## create assets and components
#### this could be rewritten as a function for use with df_aux.apply()....

In [None]:
for i in range(0, len(df_aux)):
    asset = api._create_asset(type_id=int(asset_type.id.values[0]),
                              common_name='ncmapss unit',
                              age=float(df_aux.iloc[i].age),
                              eol=float(df_aux.iloc[i].eol),
                              rul=float(df_aux.iloc[i].eol - df_aux.iloc[i].age),
                              units='cycles',
                              serial_number=serial_numbers[i],
                              db=db,
                              cur=cur)
    print(asset)

    component = api._create_component(asset=asset, 
                                      group_id=df_aux.iloc[i].group_id, 
                                      unit=df_aux.iloc[i].unit, 
                                      dataset=df_aux.iloc[i].dataset, 
                                      db=db, 
                                      cur=cur)
    print(component)

### convert index to datetime 
- given there is no time information with the provided data, set the interval at your discretion (ex: 1 second)

#### grab last record from db, and start index from that value (assuming the current dataframe does not contain any records already in the database)
#### the "id" column will only start at 1 once

In [7]:
start_id = api.execute("select max(id) from summary_tb;", db).values[0][0]
if type(start_id) == type(None):
    start_id = 0

In [70]:
start_id = 0
df.index = pd.to_datetime(df.index, unit='s', origin='unix')
df.index.names=['dt']
df.reset_index(inplace=True)
df.index += start_id + 1
df.index.names=['id']
df.reset_index(inplace=True)
df.loc[:, 'dt'] = df.loc[:, 'dt'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
df.head()

Unnamed: 0,id,dt,unit,cycle,Fc,hs,asset_id,dataset,y,alt,...,fan_eff_mod,fan_flow_mod,LPC_eff_mod,LPC_flow_mod,HPC_eff_mod,HPC_flow_mod,HPT_eff_mod,HPT_flow_mod,LPT_eff_mod,LPT_flow_mod
0,1,1970-01-01 00:00:00,1,1,1,1,1,DS01-005,99,3013.0,...,0.0,0.0,0.0,0.0,0.0,0.0,-0.0006,0.0,0.0,0.0
1,2,1970-01-01 00:00:01,1,1,1,1,1,DS01-005,99,3020.0,...,0.0,0.0,0.0,0.0,0.0,0.0,-0.0006,0.0,0.0,0.0
2,3,1970-01-01 00:00:02,1,1,1,1,1,DS01-005,99,3025.0,...,0.0,0.0,0.0,0.0,0.0,0.0,-0.0006,0.0,0.0,0.0
3,4,1970-01-01 00:00:03,1,1,1,1,1,DS01-005,99,3035.0,...,0.0,0.0,0.0,0.0,0.0,0.0,-0.0006,0.0,0.0,0.0
4,5,1970-01-01 00:00:04,1,1,1,1,1,DS01-005,99,3043.0,...,0.0,0.0,0.0,0.0,0.0,0.0,-0.0006,0.0,0.0,0.0


# ~~NOTE it is up to you to ensure you do not exceed your system memory when using batch insert~~
#### handled in function now
- the data tables are broken down slightly different then they are presented in the dataset, see the table schema
- summary_tb: id (auto generated), asset_id, cycle, alt, Mach, TRA, T2)
- telemetry_tb: dt (timestamp or datetime), all of the telemetry columns
- degradation_tb: dt (timestamp or datetime), all of the degradation columns and health state

## insert summary data first, since telemetry and degradation index off it

In [None]:
summary_cols = api.get_fields('summary_tb', as_list=True, db=db)
print(f"summary_cols: {summary_cols}")
api.batch_insert(df=df[summary_cols], 
                 tb='summary_tb', 
                 db=db, 
                 cur=cur)


## Insert telemetry and degradation data

In [None]:
telemetry_cols = api.get_fields('telemetry_tb', as_list=True, db=db)
print(f"telemetry_cols: {telemetry_cols}")
api.batch_insert(df=df[telemetry_cols], 
                 tb='telemetry_tb', # num_batches is optional with default value = 10
                 db=db, 
                 cur=cur) # verbose is optional with default value = False

In [76]:
degradation_cols = api.get_fields('degradation_tb', as_list=True, db=db)
print(f"degradation_cols: {degradation_cols}")
api.batch_insert(df=df[degradation_cols], 
                 tb='degradation_tb', 
                 num_batches=10,
                 db=db, 
                 cur=cur, 
                 verbose=True)

degradation_cols: ['id', 'fan_eff_mod', 'fan_flow_mod', 'LPC_eff_mod', 'LPC_flow_mod', 'HPC_eff_mod', 'HPC_flow_mod', 'HPT_eff_mod', 'HPT_flow_mod', 'LPT_eff_mod', 'LPT_flow_mod']
inserting batch 0 of 10...
inserting batch 1 of 10...
inserting batch 2 of 10...
inserting batch 3 of 10...
inserting batch 4 of 10...
inserting batch 5 of 10...
inserting batch 6 of 10...
inserting batch 7 of 10...
inserting batch 8 of 10...
inserting batch 9 of 10...
inserting batch 10 of 10...


7641868

# Putting it all together...
## TIM HERE TODO: create function in utils that does this for the entire dataset
In this notebook, I put a break statement after the first h5 file was processed to limit memory usage during development. The entire dataset as a dataframe uses about 40bg. These steps should be called after each dataset is loaded, or the num_batches parameter should be increased to 50 or 100. 

## Misc usage

In [None]:
api._create_asset_type(asset_type='engine', subtype='ncmapss', description='N-CMAPSS dataset unit', db=db, cur=cur)

In [None]:
db_tables = api.get_tables(db)
print(db_tables)

In [None]:
asset_tb_cols = api.get_fields('asset_tb', as_list=True, db=db)
asset_tb_cols

In [None]:
engine_tb_cols = api.get_fields('engine_tb', as_list=True, db=db)
engine_tb_cols

In [None]:
api._get_asset(serial_number='sd3kg0dk00', db=db)

In [None]:
api.table_exists(f"{asset_type.type.values[0]}_{asset_type.subtype.values[0]}_tb", db)

In [None]:
asset_type_id = api._get_asset_type(asset_type='engine', subtype='ncmapss', db=db)
print(asset_type_id)
print(type(asset_type_id))

In [None]:
asset_type = api._get_asset_type(type_id=1, db=db)
print(asset_type)
print(type(asset_type))