In [155]:
# -*- coding: utf-8 -*-

"""PyAthena library for mapped data

The purpose of this module is to provide helper functions for
accessing data using athena from the mapped data folder.

Example:

    df, meta = athena_get_df(
        id=os.getenv('AWS_ACCESS_KEY_ID'),
        key=os.getenv('AWS_SECRET_ACCESS_KEY'),
        bucket='sentient-science-customer-test',
        schema='customer-test',
        plant='kcw',
        devices=['T01', 'T02'],
        metrics=['gbxoiltemp'],
        stats=['mean'],
        start='2018-03-11T00:00:00',
        end='2018-03-11T03:00:00'
    )

    df = df.melt(id_vars=['plant', 'device', 'metric', 'timestamp'], 
                 value_vars=[col for col in df.columns if col in VALUES],
                 var_name='statistic', value_name='value')

    df['new_col_id'] = df.plant + '-' + df.device + '-' + df.metric + \
                        '-' + df.statistic

    df.pivot(index='timestamp', columns='new_col_id', values='value')

    
Globals: 

  * VALUES: a list of fields found in mapped data that contain values
  * IDS: a list of fields found in mapped data that contain row identifiers

To Do: 
  * is it better to flatten/pivot with sql or pandas?
  * add multi-plant support
  * log the timer for athena query
  
"""

import os
from pyathena import connect
from pyathena.util import as_pandas
from datetime import datetime
from time import time

VALUES = ['mean', 'min', 'max', 'stdev', 'count']
IDS = ['plant', 'device', 'metric', 'timestamp']

def format_stats(stats: list):
    
    if len(stats) == 0:
        
        return str.join(", ", VALUES)
        
    return str(stats).replace("]", "").replace("[", "").replace("'", "")

def format_metrics(metrics: list):
    
    if len(metrics) == 0:
        
        return ""
    
    return 'metric in ({})'.format(str(metrics).replace("]", "").replace("[", ""))

def format_devices(devices: list):
    
    if len(devices) == 0:
        
        return ""
    
    return 'and device in ({})'.format(str(devices).replace("]", "").replace("[", ""))

def athena_get_df(id: str, key: str, bucket: str, schema: str, 
                      plant: str, devices: list = [], metrics: list = [], 
                      stats: list = [], start: str = '1970-01-01T00:00:00', 
                      end: str = datetime.strftime(datetime.now(), '%Y-%m-%dT%H:%m:%S')):
    
    begin = time()
    
    cursor = connect(aws_access_key_id=id,
                 aws_secret_access_key=key,
                 s3_staging_dir='s3://{}/scada/temp'.format(bucket),
                 schema_name=schema,
                 region_name='us-east-1').cursor()
            
    
    query = str.join(" ", ("SELECT plant, device, metric, timestamp, {}"
                           , "FROM test_scada_mapped_{} where"
                           , "{}"
                           , "{}"
                           , "and timestamp >= '{}'"
                           , "and timestamp <= '{}'"
                           , "order by plant, device, timestamp"
                     )) \
                   .format(format_stats(stats), plant, 
                           format_metrics(metrics),
                           format_devices(devices),
                           start, end)
    
    query = query.replace('  ', ' ').replace('where and', 'where ')
        
    cursor.execute(query)
                
    return as_pandas(cursor), {'query': query,
                               'query-time': time() - begin}

In [156]:
if __name__ == "__main__":
    
    df, meta = athena_get_df(
        id=os.getenv('AWS_ACCESS_KEY_ID'),
        key=os.getenv('AWS_SECRET_ACCESS_KEY'),
        bucket='sentient-science-customer-test',
        schema='customer-test',
        plant='kcw',
        devices=['T01', 'T02'],
        metrics=['gbxoiltemp'],
        stats=['mean'],
        start='2018-03-11T00:00:00',
        end='2018-03-11T03:00:00'
    )

    df = df.melt(id_vars=['plant', 'device', 'metric', 'timestamp'], 
                 value_vars=[col for col in df.columns if col in VALUES],
                 var_name='statistic', value_name='value')

    df['new_col_id'] = df.plant + '-' + df.device + '-' + df.metric + \
                        '-' + df.statistic

    df = df.pivot(index='timestamp', columns='new_col_id', values='value')

SELECT plant, device, metric, timestamp, mean FROM test_scada_mapped_kcw where metric in ('gbxoiltemp') and device in ('T01', 'T02') and timestamp >= '2018-03-11T00:00:00' and timestamp <= '2018-03-11T03:00:00' order by plant, device, timestamp


In [157]:
meta

{'query': "SELECT plant, device, metric, timestamp, mean FROM test_scada_mapped_kcw where metric in ('gbxoiltemp') and device in ('T01', 'T02') and timestamp >= '2018-03-11T00:00:00' and timestamp <= '2018-03-11T03:00:00' order by plant, device, timestamp",
 'query-time': 6.709803104400635}