*Created: 23 February 2022*<br>
*Last Update: 8 April 2022*<br>
*Author: Yasir Abdur Rohman*<br>
**Property of PT Indonesia Power & Lab Getaran & Diagnosis Mesin Undip**

---

# Pipeline Deployment on Notebook
Case: SLA5 - Local Bearing Problem

In [12]:
from estimate.vbm import VBM
from residual.residual import Residual
from utils.pihelper import PIHelper
from utils.transform import TransformData
from utils.model import Model
from utils.db import insert_to_table_runtime
from utils.threshold import get_actual_low, get_actual_high, get_sensor_id
from utils.threshold import get_residual_positive_threshold, get_residual_negative_threshold

from db.base import session
from db.diagnostic.models.model_fault import Fault
from db.assets.models.model_assets import Assets
from db.assets.models.model_asset_failure import AssetFailure
from db.diagnostic.models.model_model_tag import ModelTag
from db.diagnostic.models.model_priority import Priority
from db.diagnostic.models.model_diagnose_date import DiagnoseDate

from osisoft.pidevclub.piwebapi.models import PIAnalysis, PIItemsStreamValues, PIStreamValues, PITimedValue, PIRequest

import pandas as pd
import time
import asyncio
from datetime import timedelta, date, datetime

In [3]:
# connect client
pihelper = PIHelper()
client = pihelper.connect_client()

# setup paths
parent = "af:\\\\pi1\\SLA5."
sensor = ['Generator Gross Capacity',
          'Turbine Lube Oil Cooler Outlet Temperature',
          'Turbine.Bearing 1 Drain Oil Temperature',
          'Turbine.Bearing 1 Metal Temperature',
          'Turbine.Bearing 1X Vibration',
          'Turbine.Bearing 1Y Vibration',
          'Turbine.Bearing 2 Drain Oil Temperature',
          'Turbine.Bearing 2 Metal Temperature',
          'Turbine.Bearing 2X Vibration',
          'Turbine.Bearing 2Y Vibration']

paths = [parent+sensor[i] for i in range(len(sensor))]

In [4]:
# extract and transform data

def etl(paths, start_time, end_time, interval):
    # extract
    data = client.data.get_multiple_interpolated_values(paths, start_time=start_time, end_time=end_time, interval=interval)
    # transform
    transformer = TransformData()
    data = transformer.reduce_columns(data, sensor)
    data = transformer.transform(data)
    return data

In [5]:
start_time = '2022-03-09 22:39:00'
# end_time = '2020-01-01 00:01:00'
interval = '1m'

etl(paths, start_time, start_time, interval)

{'date': '2022-03-09 22:39:00',
 'sensors': ['Generator Gross Capacity',
  'Turbine Lube Oil Cooler Outlet Temperature',
  'Turbine.Bearing 1 Drain Oil Temperature',
  'Turbine.Bearing 1 Metal Temperature',
  'Turbine.Bearing 1X Vibration',
  'Turbine.Bearing 1Y Vibration',
  'Turbine.Bearing 2 Drain Oil Temperature',
  'Turbine.Bearing 2 Metal Temperature',
  'Turbine.Bearing 2X Vibration',
  'Turbine.Bearing 2Y Vibration'],
 'actuals': [592.164856,
  46.050465,
  54.073532,
  85.784355,
  0.06219,
  0.044496,
  54.294548,
  88.5381,
  0.034864,
  0.039652]}

In [6]:
point1 = client.point.get_by_path("\\\\PI1\SLA5.Turbine.Bearing 1 recommendation prescriptive prediction", None)
point2 = client.point.get_by_path("\\\\PI1\SLA5.Turbine.Bearing 1 priority prescriptive prediction", None)
point3 = client.point.get_by_path("\\\\PI1\SLA5.Turbine.Bearing 1 fault prescriptive prediction", None)

def upload_recommendation(batch_data, point1, point2, point3):
    streamValue1 = PIStreamValues()
    streamValue2 = PIStreamValues()
    streamValue3 = PIStreamValues()

    values1 = list()
    values2 = list()
    values3 = list()

    value1 = PITimedValue()
    value2 = PITimedValue()
    value3 = PITimedValue()

    for data in batch_data:
        timestamp = data[0]
        fault = data[1]
        priority = data[2]
        recommendation = data[3]
        
        value1.value = recommendation
        value1.timestamp = timestamp
        streamValue1.web_id = point1.web_id
        values1.append(value1)

        value2.value = priority
        value2.timestamp = timestamp
        streamValue2.web_id = point2.web_id
        values2.append(value2)

        value3.value = fault
        value3.timestamp = timestamp
        streamValue3.web_id = point3.web_id
        values3.append(value3)

    streamValue1.items = values1
    streamValue2.items = values2
    streamValue3.items = values3

    streamValues = list()
    streamValues.append(streamValue1)
    streamValues.append(streamValue2)
    streamValues.append(streamValue3)

    response = client.streamSet.update_values_ad_hoc_with_http_info(streamValues, None, None)
    return response

# Loop

In [7]:
# define asset and fault
unit = 'SLA5'
bearing = 'Bearing 1'
fault = 'Local Bearing'
asset = session.query(Assets).get(2) #get asset->id=2

# Threshold
actual_low = get_actual_low(sensor)
actual_high = get_actual_high(sensor)
sensor_id = get_sensor_id(sensor)
residual_positive_threshold = get_residual_positive_threshold(sensor_id)
residual_negative_threshold = get_residual_negative_threshold(sensor_id)

In [17]:
async def execute_diagnostic_bearing(paths):
    while True:
        # cek waktu untuk update tiap menit
        now_time = datetime.now().strftime('%Y-%m-%d %H:%M:00')
        if current_time == now_time:
            time.sleep(5)
            continue

        # get data
        data = etl(paths, current_time, current_time, interval)
    return data

async def execute_diagnostic_unit(paths):
    current_time = DiagnoseDate._get_last_diagnose() + timedelta(minutes=1)
    current_time = current_time.strftime('%Y-%m-%d %H:%M:%S') #get last diagnose date from db
    
    print('bearing 1 running..')
    b1 = await execute_diagnostic_bearing(paths, current_time, interval)
    print('bearing 2 running..')
    b2 = await execute_diagnostic_bearing(paths, current_time, interval)
    
    current_time = current_time + timedelta(minutes=1)
    current_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
    
    return current_time

# asyncio.run(execute_diagnostic_unit(paths))

In [8]:
# looping for each minute
current_time = DiagnoseDate._get_last_diagnose() + timedelta(minutes=1)
current_time = current_time.strftime('%Y-%m-%d %H:%M:%S') #get last diagnose date from db
# current_time = '2022-03-14 20:02:00'
temp_upload = []

while True:
    # if current_time == '2022-03-10 13:00:00':
    #     break
    now_time = datetime.now().strftime('%Y-%m-%d %H:%M:00')
    # check waktu pada current_time apakah sama dengan now_time (waktu saat ini)
    # jika iya, tidak melakukan apa-apa (menunggu).
    # jika tidak, melakukan proses diagnostic hingga update current_time + 1 menit.
    if current_time == now_time:
        time.sleep(5)
        continue
    
    # get data
    data = etl(paths, current_time, current_time, interval)
    
    # load model (state matrix)
    model = Model(current_time, unit, bearing, fault)
    state_matrix = model.load_state_matrix()
    
    # estimate with VBM
    vbm = VBM(actual_high, actual_low)
    estimates, state_matrix = vbm.estimate_sensors(data['actuals'], state_matrix)
    
    # update model (state matrix)
    model.update_state_matrix(state_matrix)
    
    # calculate residual
    residual_indication_positives = []
    residual_indication_negatives = []
    residuals = []
    for i in range(len(data['actuals'])):
        resid = Residual(data['actuals'][i], estimates[i], residual_positive_threshold[i], residual_negative_threshold[i])
        residuals.append(resid.residual)
        residual_indication_positives.append(resid.residual_indication_positive)
        residual_indication_negatives.append(resid.residual_indication_negative)
    
    # insert to table runtime
    ## constructing the output
    values = []
    for i in range(len(sensor)):
        val = (current_time, sensor_id[i], fault, data["actuals"][i], None, estimates[i], residuals[i], None, residual_indication_positives[i], residual_indication_negatives[i])
        values.append(val)
    # insert into table runtime
    # insert_to_table_runtime(values)
    
    # diagnostic process
    # get all diagnostic rule expression from the fault
    diag_rule = session.query(Fault).get(fault).diagnostics.all()
    # get spesific based on asset_id
    for d in diag_rule:
        if d.get_asset() == asset.id:
            diag = d
    # determine priority
    current_time = datetime.strptime(current_time, '%Y-%m-%d %H:%M:%S')
    priority = diag.calculate(current_time, fault)
    print(f'current time: {current_time} with priority = {priority}')
    if int(priority) > 0:
        # print("=== FAILURE DETECTED ===")
        fault_object = AssetFailure(date=current_time, 
                                    asset_id=asset.id,
                                    fault=diag.fault,
                                    priority=int(priority))
        # session.add(fault_object)
        # session.commit()

        recommendation = session.query(Priority.recomendation).filter_by(priority=priority, fault=fault).first()[0]
        # print(f'Recommendation: {recommendation}')
        # upload to PI: timestamp, fault, priority, rekomendasi
        
        status = upload_recommendation([(current_time - timedelta(hours=7), fault, priority, recommendation)])
    else:
        status = upload_recommendation([(current_time - timedelta(hours=7), '', priority, '')])
        print(status)
        # check apakah hari baru
        # if current_time.strftime('%Y-%m-%d %H:%M:%S').split()[1] in ['00:00:00','06:00:00','12:00:00','18:00:00']:
            # status = upload_recommendation(temp_upload)
            # print(status)
            # print(f"batch data {current_time.strftime('%Y-%m-%d %H:%M:%S')} telah diupload ke pi system!")
            # temp_upload.clear()
        # else:
            # temp_upload.append((current_time - timedelta(hours=7), fault, priority, recommendation))
        
    # update diagnose_date
    # dgdate = session.query(DiagnoseDate).first()
    # dgdate.timestamp = current_time
    # session.commit()
    
    # update current time
    current_time = current_time + timedelta(minutes=1)
    current_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
    # print()

PendingRollbackError: Can't reconnect until invalid transaction is rolled back. (Background on this error at: https://sqlalche.me/e/14/8s2b)

### Looping untuk real-time
Dilakukan looping secara real-time, hanya melakukan proses tiap menit.

> Catatan: harus sudah dilakukan running sampai waktu saat ini pada looping di atas.

In [16]:
# looping for each minute
current_time = DiagnoseDate._get_last_diagnose() + timedelta(minutes=1)
current_time = current_time.strftime('%Y-%m-%d %H:%M:%S') #get last diagnose date from db

while True:
    now_time = datetime.now().strftime('%Y-%m-%d %H:%M:00')
    # check waktu pada current_time apakah sama dengan now_time (waktu saat ini)
    # jika iya, tidak melakukan apa-apa (menunggu).
    # jika tidak, melakukan proses diagnostic hingga update current_time + 1 menit.
    if current_time == now_time:
        time.sleep(5)
        continue
    
    # get data
    data = etl(paths, current_time, current_time, interval)
    
    # load model (state matrix)
    print(type(current_time))
    model = Model(current_time)
    state_matrix = model.load_state_matrix()
    
    # estimate with VBM
    vbm = VBM(actual_high, actual_low)
    estimates, state_matrix = vbm.estimate_sensors(data['actuals'], state_matrix)
    
    # update model (state matrix)
    model.update_state_matrix(state_matrix)
    
    # calculate residual
    residual_indication_positives = []
    residual_indication_negatives = []
    residuals = []
    for i in range(len(data['actuals'])):
        resid = Residual(data['actuals'][i], estimates[i], residual_positive_threshold[i], residual_negative_threshold[i])
        residuals.append(resid.residual)
        residual_indication_positives.append(resid.residual_indication_positive)
        residual_indication_negatives.append(resid.residual_indication_negative)
    
    # insert to table runtime
    ## constructing the output
    values = []
    for i in range(len(sensor)):
        val = (current_time, sensor_id[i], fault, data["actuals"][i], None, estimates[i], residuals[i], None, residual_indication_positives[i], residual_indication_negatives[i])
        values.append(val)
    # insert into table runtime
    insert_to_table_runtime(values)
    
    # diagnostic process
    # get all diagnostic rule expression from the fault
    diag_rule = session.query(Fault).get(fault).diagnostics.all()
    # get spesific based on asset_id
    for d in diag_rule:
        if d.get_asset() == asset.id:
            diag = d
    # determine priority
    current_time = datetime.strptime(current_time, '%Y-%m-%d %H:%M:%S')
    priority = diag.calculate(current_time, fault)
    if int(priority) > 0:
        # print("=== FAILURE DETECTED ===")
        fault_object = AssetFailure(date=current_time, 
                                    asset_id=asset.id,
                                    fault=diag.fault,
                                    priority=int(priority))
        session.add(fault_object)
        session.commit()
        # recommendation
        recommendation = session.query(Priority.recomendation).filter_by(priority=priority, fault=fault).first()[0]
        # print(f'Recommendation: {recommendation}')
        # upload to PI: timestamp, fault, priority, rekomendasi
        
        # check apakah hari baru
        # if current_time.strftime('%Y-%m-%d %H:%M:%S').split()[1] == '00:00:00':
        status = upload_recommendation([(current_time, fault, priority, recommendation)])
            # print('batch data telah diupload ke pi system!')
        #     temp_upload.clear()
        # else:
        #     temp_upload.append((current_time, fault, priority, recommendation))
        
    # update diagnose_date
    dgdate = session.query(DiagnoseDate).first()
    dgdate.timestamp = current_time
    session.commit()
    
    # update current time
    current_time = current_time + timedelta(minutes=1)
    current_time = current_time.strftime('%Y-%m-%d %H:%M:%S')

2022-03-10 12:59:00
