In [1]:
import boto3
import traceback
import sys
import json
import pandas as pd
import numpy as np

from botocore.config import Config

In [2]:
aws_session = boto3.session.Session()
ts_query_client = aws_session.client('timestream-query')
sagemaker_runtime = aws_session.client('runtime.sagemaker')
timestream_write_client = aws_session.client('timestream-write', config=Config(read_timeout=20, 
                                                                               max_pool_connections=5000, 
                                                                               retries={'max_attempts': 10}))
ENDPOINT_NAME = 'sagemaker-scikit-learn-2021-03-24-04-37-33-374'
kinesis_client = aws_session.client('firehose', region_name='us-west-2')
stream_name = 'anomalies1-20210310'

In [3]:
d_types = {'int':'BIGINT', 'float':'DOUBLE', 'bool':'BOOLEAN'}

dimensions = [
    {'Name': 'Factory_Id', 'Value': '1'},
    {'Name': 'Component_Id', 'Value': '65'},
    {'Name': 'Component_Type', 'Value': 'AHU-3/Room-274	'},
    {'Name': 'Component_Name', 'Value': 'Thermafuser'},
]

In [4]:
def run_query(paginator, query_string):
    
        results = None
    
        try:
            page_iterator = paginator.paginate(QueryString=query_string)
            for page in page_iterator:
                results = parse_query_result(page)
                
                res_df = pd.DataFrame(results)
                
                return res_df
        except Exception as err:
            print("Exception while running query:", err)
            traceback.print_exc(file=sys.stderr)
            
            
def parse_query_result(query_result):
    query_status = query_result["QueryStatus"]
    column_info = query_result['ColumnInfo']
    
    results = []
    
    print(query_status)

    """
    progress_percentage = query_status["ProgressPercentage"]
    print(f"Query progress so far: {progress_percentage}%")

    bytes_scanned = float(query_status["CumulativeBytesScanned"]) / ONE_GB_IN_BYTES
    print(f"Data scanned so far: {bytes_scanned} GB")

    bytes_metered = float(query_status["CumulativeBytesMetered"]) / ONE_GB_IN_BYTES
    print(f"Data metered so far: {bytes_metered} GB")

    column_info = query_result['ColumnInfo']

    print("Metadata: %s" % column_info)
    print("Data: ")
    for row in query_result['Rows']:
        print(self._parse_row(column_info, row))
    """
    
    print("Metadata: %s" % column_info)
    print("Data: ")
    for row in query_result['Rows']:
        results.append(parse_row(column_info, row))
        #print(parse_row(column_info, row))
        
    return results
        

def parse_row(column_info, row):
    data = row['Data']
    row_output = []
    convert_data = {'BOOLEAN':bool, 'BIGINT':int, 'VARCHAR':str, 'DOUBLE':float}
    
    idComponent = None
    componentName = None
    componentType = None
    measure_name = ''
    measure_value = 0
    measure_time = None
    
    #print(data[:5])
    
    for j in range(len(data)):
        info = column_info[j]
        datum = data[j]
        #row_output.append(self._parse_datum(info, datum))
        #print(info)
        #print(datum)
        
        if datum.get('NullValue') != True:
            
            if info['Name'] == 'Component_Id':
                idComponent = int(datum['ScalarValue'])
            elif info['Name'] == 'Component_Type':
                componentType = datum['ScalarValue']
            elif info['Name'] == 'Component_Name':
                componentName = datum['ScalarValue']
            elif 'measure_value' in info['Name']:
                measure_value = convert_data[info['Type']['ScalarType']](datum['ScalarValue'])
            elif info['Name'] == 'measure_name':
                measure_name = str(datum['ScalarValue'])
            elif info['Name'] == 'time':
                measure_time = str(datum['ScalarValue'])
                
    return (idComponent, componentName, componentType, measure_name, measure_value, measure_time)

    #return "{%s}" % str(row_output)
    

In [5]:
QUERY_1 = 'SELECT * FROM "octank-america-hvac"."thermafuser_readings" WHERE time between ago(10m) and now() ORDER BY time ASC '

In [6]:
paginator = ts_query_client.get_paginator('query')
res_df = run_query(paginator, QUERY_1)

{'ProgressPercentage': 100.0, 'CumulativeBytesScanned': 481080, 'CumulativeBytesMetered': 10000000}
Metadata: [{'Name': 'Factory_Id', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'Component_Id', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'Component_Type', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'Component_Name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_value::boolean', 'Type': {'ScalarType': 'BOOLEAN'}}, {'Name': 'measure_value::double', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}]
Data: 
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '61'}, {'ScalarValue': 'AHU-3/Room-270'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '61'}, {'ScalarValue': 'AHU-3/Room-270'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '61'}, {'ScalarValue': 'AHU-3/Room-27

[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '65'}, {'ScalarValue': 'AHU-3/Room-274'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '65'}, {'ScalarValue': 'AHU-3/Room-274'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '65'}, {'ScalarValue': 'AHU-3/Room-274'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '65'}, {'ScalarValue': 'AHU-3/Room-274'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '61'}, {'ScalarValue': 'AHU-3/Room-270'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '61'}, {'ScalarValue': 'AHU-3/Room-270'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '61'}, {'ScalarValue': 'AHU-3/Room-270'}, {'ScalarValue': 'Thermafuser'},

[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '62'}, {'ScalarValue': 'AHU-3/Room-271'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '62'}, {'ScalarValue': 'AHU-3/Room-271'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '62'}, {'ScalarValue': 'AHU-3/Room-271'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '62'}, {'ScalarValue': 'AHU-3/Room-271'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '62'}, {'ScalarValue': 'AHU-3/Room-271'}, {'ScalarValue': 'Thermafuser'}, {'NullValue': True}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '62'}, {'ScalarValue': 'AHU-3/Room-271'}, {'ScalarValue': 'Thermafuser'}, {'ScalarValue': 'true'}]
[{'ScalarValue': 'Octank Oregon'}, {'ScalarValue': '63'}, {'ScalarValue': 'AHU-3/Room-272'}, {'ScalarValue': 'Thermafuse

In [7]:
res_df = res_df.rename(columns={0:'id', 1:'name', 2:'type', 3:'measure', 4:'value', 5:'time'})
#res_df['time'] = pd.to_datetime(res_df['time'])
res_df.head(10)

Unnamed: 0,id,name,type,measure,value,time
0,61,Thermafuser,AHU-3/Room-270,terminalLoad,24.0,2021-03-25 21:33:45.603000000
1,61,Thermafuser,AHU-3/Room-270,airflowFeedback,76.0,2021-03-25 21:33:45.603000000
2,61,Thermafuser,AHU-3/Room-270,supplyAir,61.0,2021-03-25 21:33:45.603000000
3,61,Thermafuser,AHU-3/Room-270,occupiedHeatingSetpoint,71.607841,2021-03-25 21:33:45.603000000
4,61,Thermafuser,AHU-3/Room-270,zoneTemperature,75.599998,2021-03-25 21:33:45.603000000
5,61,Thermafuser,AHU-3/Room-270,roomOccupied,True,2021-03-25 21:33:45.603000000
6,61,Thermafuser,AHU-3/Room-270,occupiedCoolingSetpoint,76.607841,2021-03-25 21:33:45.603000000
7,62,Thermafuser,AHU-3/Room-271,roomOccupied,True,2021-03-25 21:33:45.649000000
8,62,Thermafuser,AHU-3/Room-271,zoneTemperature,71.699997,2021-03-25 21:33:45.649000000
9,62,Thermafuser,AHU-3/Room-271,supplyAir,58.900002,2021-03-25 21:33:45.649000000


In [None]:
id_df = res_df.loc[res_df['id'] == 65]
componentName = id_df.iloc[0]['name']
componentType = id_df.iloc[0]['type']

pivoted_df = id_df.pivot(index="time", columns="measure", values="value")
pivoted_df = pivoted_df.reset_index()

In [None]:
pivoted_df.head()

In [None]:
payload = pivoted_df.to_json()
response = sagemaker_runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME, ContentType='application/json', Accept='application/json', Body=json.dumps(payload))
bresponse = response['Body'].read()
predictions = np.array(json.loads(bresponse))
anomalies_zero = predictions + 1

In [None]:
print(predictions)

In [None]:
print(pivoted_df.shape)
print(predictions.shape)

predictions_start_index = pivoted_df.shape[0] - predictions.shape[0]
print(predictions_start_index)

In [None]:
predicted_df = pivoted_df[predictions_start_index:].copy()
predicted_df['anomaly'] = predictions
print(predicted_df.shape)

In [None]:
predicted_df.head()

In [None]:
predicted_df = predicted_df[['time', 'anomaly']]
predicted_df

In [None]:
def anomalies_to_s3(df_anomalies, idFactory, idComponent, componentName, componentType):
    
    df_anomalies = df_anomalies.rename(columns={'time':'timestamp'})
    df_anomalies = df_anomalies.loc[df_anomalies['anomaly'] == -1].copy()
    #df_anomalies['timestamp'] = df_anomalies['timestamp'].map(lambda x: x.strftime("%Y-%m-%d %H:%M:%S.%f"))
    df_anomalies['factoryId'] = idFactory
    df_anomalies['objectId'] = idComponent
    df_anomalies['name'] = componentName
    df_anomalies['type'] = componentType
    anomalies_dict = df_anomalies.to_dict(orient='records')
    
    return anomalies_dict

In [None]:
def anomalies_df_to_timestream(df_anomalies, dimensions):
    
    records = []
    
    anomalies_dict = df_anomalies.set_index('time')
    anomalies_dict = anomalies_dict[:50].copy()
    anomalies_dict = anomalies_dict.to_dict()
    anomalies_dict = anomalies_dict['anomaly']
        
    #print(anomalies_dict)
    
    for key in anomalies_dict.keys():
    
        ts_record = {'Dimensions': dimensions, 'MeasureName': 'anomaly', 'MeasureValue': str(anomalies_dict[key]), 'MeasureValueType':'BIGINT', 'Time': str(key)}
        records.append(ts_record)
        
    return records

In [None]:
try:
    
    records = anomalies_to_s3(predicted_df, 1, 65, componentName, componentType)
    fs_records = [{'Data':json.dumps(x)} for x in records]
    print(records)
    #print(fs_records)
    result = kinesis_client.put_record_batch(DeliveryStreamName=stream_name, Records=fs_records)

    if result['ResponseMetadata']['HTTPStatusCode'] != 200:
        print('Status: ' + str(result['ResponseMetadata']))
        app_logger.error('Status: ' + str(result['ResponseMetadata']))
        print(traceback.print_exc())
        app_logger.error(traceback.print_exc())
except Exception as err:
    print("Error:", err)
    print(traceback.print_exc()) 

In [None]:
records_ts = anomalies_df_to_timestream(predicted_df, dimensions)
print(records_ts)

In [None]:
try:
    result = timestream_write_client.write_records(DatabaseName='octank-america-hvac', TableName='thermafuser_readings',
                                       Records=records_ts, CommonAttributes={})

    if result['ResponseMetadata']['HTTPStatusCode'] != 200:
        print('Status: ' + str(result['ResponseMetadata']))

    #print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except Exception as err:
    print("Error:", err)
    print(traceback.print_exc())