In [220]:
import os
import json
import time
import logging
from datetime import datetime

import botocore
import boto3
import psycopg2
import psycopg2.extras as extras
import pandas as pd

from functools import wraps

In [248]:
# Variable Initializations
# Get values from Environment Variable
filepath_metadata = os.environ['FILEPATH_METADATA']
SAMPLING_FREQUENCY = os.environ['SAMPLING_FREQUENCY']
DB_HOST = os.environ['DB_HOST']
DB_PORT = os.environ['DB_PORT']
DB_USER = os.environ['DB_USER']
DB_NAME = os.environ['DB_NAME']
DB_PASSWORD = os.environ['DB_PASSWORD']
DEBUG = os.environ['DEBUG'] # boolean

In [249]:
met_columnnames = ['datetime', 'station', 'interval_length', 'sp04', 'sp10', 'gu04', 'gu10', 'dn04', 'dn10', 'sg04', 'sg10', 'at02', 'at10', 'delt', 'rh02', 'rh10', 'pres', 'rain', 'swin', 'swout', 'iwin', 'iwout', 'grad', 'nrad']
aqm_columnnames = ['datetime', 'station', 'interval_length', 'pm2_5', 'pm10', 'pmcrs', 'so2', 'co', 'no', 'no2', 'nox', 'o3']
column_agg_mapping = {
    'sp04': 'mean', 'sp10': 'mean', 'gu04': 'mean', 'gu10': 'mean', 
    'dn04': 'mean', 'dn10': 'mean', 'sg04': 'mean', 'sg10': 'mean',
    'at02': 'mean', 'at10': 'mean', 'delt': 'mean', 'rh02': 'mean',
    'rh10': 'mean', 'pres': 'mean', 'rain': 'sum', 'swin': 'mean',
    'swout': 'mean', 'lwin': 'mean', 'lwout': 'mean', 'grad': 'mean', 
    'nrad': 'mean', 'pm2_5': 'mean', 'pm10': 'mean', 'pmcrs': 'mean',
    'so2': 'mean', 'co': 'mean', 'no': 'mean', 'no2': 'mean',
    'nox': 'mean', 'o3': 'mean'    
}

In [250]:
logger = logging.getLogger(__name__)
# Set logging level according to the DEBUG boolean value
if DEBUG:
    logger.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)

# Database connection parameters
params_dict = {
    "host": DB_HOST,
    "port": DB_PORT,
    "database": DB_NAME,
    "user": DB_USER,
    "password": DB_PASSWORD
}

In [251]:
def load_metadata(filepath):
    """
    returns metadata 
    :params: a valid path to metadta (JSON)
    """
    try:
        with open(filepath) as f:
            data = json.load(f)
        return data
    except IOError:
        logger.error("An error has occurred. No such file found!")
        if DEBUG:
            raise
def get_translation_from_filename(filename, metadata):
    """
    returns standard column names mapping of a file.
    """
    if isinstance(metadata, dict):
        if 'translations' in metadata:
            translations = metadata['translations']

            for i in range(len(translations)):
                if translations[i]['filename'].lower() == filename.lower():
                    return translations[i]['columnnames']
    else:
        logger.error("Metadata is not Dictionary Object.")

def get_metadata_from_filename(filename, metadata):
    """
    returns entire metadata of a file.
    """
    if isinstance(metadata, dict):
        if 'translations' in metadata:
            translations = metadata['translations']

            for i in range(len(translations)):
                if translations[i]['filename'].lower() == filename.lower():
                    return translations[i]
    else:
        logger.error("Metadata is not Dictionary Object.")
        
def connect_db(params_dict):
    """
    Connect to the PostgreSQL database server
    returns connection instance.
    """
    conn = None
    try:
        logger.debug('Connecting to the PostgreSQL Database...')
        conn = psycopg2.connect(**params_dict)
    except (Exception, psycopg2.DatabaseError) as error:

        logger.error("An error occurred while establishing connection to database.")
        
        if DEBUG:
            raise
    
    logger.debug('Connection Successful.')

    return conn

def split_met_aqm(df, met_columnnames, aqm_columnnames):
    """
    splits combined dataframe into Meteorological and Air quality
    dataframes
    """
    cols = list(df.columns)

    met_columns, aqm_columns = [], []

    for column in cols:
        if column in met_columnnames:
            met_columns.append(column)
        if column in aqm_columnnames:
            aqm_columns.append(column)

    return df[met_columns], df[aqm_columns]

def predelete_records(conn, min_datetime, max_datetime, tablename, station_id):
    """
    predeletes the existing records from database based on
    min_datetime and max_datetime of calculated hourly average
    records.
    """
    conn = connect_db(params_dict)
    if conn is not None:        
        query = "DELETE FROM " + "core_data." + tablename + " WHERE station=" + station_id + " AND datetime>=" + "'" + min_datetime + "'" + " AND datetime<=" + "'" + max_datetime + "'"
        cur = conn.cursor()    
        try:
            cur.execute(query)
            conn.commit()

            count = cur.rowcount
            logger.info(f"{count} records deleted.")

        except (Exception, psycopg2.DatabaseError) as error:            
            logger.error("An error occurred while establishing connection to database.")            
            conn.rollback()         
            if DEBUG:
                raise
        finally:
            if cur:
                cur.close()
                conn.close()
                
def get_min_max_datetime(df):
    """
    returns minimum and maximum datetime 
    where `datetime` is time-series index.
    """
    return df.index.min(), df.index.max()

In [252]:
metadata = load_metadata('MetaData.json')

In [253]:
def execute_batch_insertion(conn, min_datetime, max_datetime, df, fieldnames, tablename, station_id, page_size=100):
    """
    using psycopg2.extras.execute_batch() to insert the dataframe
    """

    predelete_records(conn, str(min_datetime), str(max_datetime), str(tablename), str(station_id))
    # batch execution
    tuples = [tuple(x) for x in df.to_numpy()]
    query = 'INSERT INTO ' + 'core_data.' + tablename + '(' + ','.join(fieldname for fieldname in fieldnames) + ') VALUES(' + ','.join(['%s'] * len(fieldnames)) + ');'
    
    cur = conn.cursor()
    try:
        extras.execute_batch(cur, query, tuples, page_size)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        logger.error("An error occurred while establishing connection to database.")
        conn.rollback()
        if DEBUG:
            raise
    finally:
        logger.info("Successfully loaded all data to PostgreSQL")
        if cur:
            cur.close()
            conn.close()

In [272]:
def load_to_db(filename, metadata, met_tablename='dwer_met', aqm_tablename='dwer_aqm'):
    
    # extract only filenames from the file_path
    splitted_filenames = filename.split('_')
    date, extension = splitted_filenames[-1].split('.')
    filename_without_date = '_'.join(splitted_filenames[0:-1]) + '.' + extension
    csv_file = filename_without_date.split('/')[-1]

    # get the column names mapping from metadata
    columnnames_tx = get_translation_from_filename(csv_file, metadata)

    if not columnnames_tx:
        logger.error(f"Could not find translations of {csv_file}.")
    
    df = pd.read_csv(filename)

    # processing the dataframe
    if 'Date_Time.1' in list(df.columns):
        df = df.drop('Date_Time.1', axis=1)
    
    # rename columns as in MetaData
    df.rename(columns=columnnames_tx, inplace=True)
    
    std_columns = list(columnnames_tx.values())
    renamed_columns = list(df.columns)
    
    # drop duplicate columns
    df = df.loc[:, ~df.columns.duplicated()]
    
    # drop columns if not present in Metadata
    for value in renamed_columns:
        if value not in std_columns:
            logger.info(f"Dropping {value} column")
            df = df.drop(value, axis=1)
            logger.info(f"Successfully dropped {value} column")
            logger.error(f"{value} is missing in {csv_file}")
    
    # generate column aggregate mapping for a given file
    col_agg_mappings = { key: column_agg_mapping[key] for key in list(columnnames_tx.values())[1:]}
    
    # set `datetime` as the index of df
    data_series = pd.notnull(df["datetime"])
    df['datetime'] = df['datetime'][data_series].apply(lambda x: datetime.strptime(x, '%d%m%Y %H%M'))
    indexes = pd.DatetimeIndex(df['datetime'])
    df = df.set_index(indexes)
    
    # generate hourly average records
    try:
        df_hourly = df.resample(SAMPLING_FREQUENCY).agg(col_agg_mappings)   
        
    except pd.core.base.SpecificationError:
        if DEBUG:
            raise
    
    else:
        if 'rain' in list(df_hourly.columns):
            df_hourly.loc[df_hourly['rain'] == 0, 'rain'] = None
            
        # drop records if all the records are null
        df_hourly = df_hourly.dropna(how='all', axis=0)

        # add sampled datetime index to new column    
        df_hourly.insert(0, 'datetime', df_hourly.index)

        # replace NaN to Null(None)
        df_hourly = df_hourly.where(pd.notnull(df_hourly), None)

        min_datetime, max_datetime = get_min_max_datetime(df_hourly)

        # log minimum and maximum datetime, total records
        logger.info(f"MIN DATETIME in {csv_file}: {min_datetime}")
        logger.info(f"MAX DATETIME in {csv_file}: {max_datetime}")
        logger.info(f"Total records in {csv_file}: {df_hourly.size}")

        # get metadata of the station
        station_metadata = get_metadata_from_filename(csv_file, metadata)
        
        station_id = station_metadata['stationid']
        df_hourly['datetime'] = df_hourly['datetime'].apply(lambda x: x.to_pydatetime())
        df_hourly.insert(1, 'station', int(station_id))
        df_hourly.insert(2, 'interval_length', 60)

        fieldnames = list(station_metadata['columnnames'].values())
        fieldnames.insert(1, 'station')
        fieldnames.insert(2, 'interval_length')
        
        if station_metadata['filetype'].lower() == 'm':
            logger.debug(f"{csv_file} has Meteorological Data.")
            
            conn = connect_db(params_dict)
            
            execute_batch_insertion(
                conn, 
                min_datetime, max_datetime, 
                df_hourly.round(4), fieldnames, met_tablename, 
                station_id,
                50
            )
        elif station_metadata['filetype'].lower() == 'a':
            logger.debug(f"{csv_file} has Air Quality Data.")
            
            conn = connect_db(params_dict)
            
            execute_batch_insertion(
                conn,
                min_datetime, max_datetime,
                df_hourly.round(4), fieldnames, aqm_tablename,
                station_id,
                50
            )
        elif station_metadata['filetype'].lower() == 'ma':
            logger.debug(f"{csv_file} has both Meteorological and Air Quality Data.")

            # split the combined dataframe
            df_hourly_met, df_hourly_aqm = split_met_aqm(df_hourly.round(4), met_columnnames, aqm_columnnames)
            
            min_datetime_met, max_datetime_met = get_min_max_datetime(df_hourly_met)
            min_datetime_aqm, max_datetime_aqm = get_min_max_datetime(df_hourly_aqm)
            
            met_fieldnames = list(df_hourly_met.columns)
            aqm_fieldnames = list(df_hourly_aqm.columns)
            
            conn = connect_db(params_dict)
            
            execute_batch_insertion(
                conn,
                min_datetime_met, max_datetime_met,
                df_hourly_met.round(4), met_fieldnames, met_tablename,
                station_id,
                1
            )
            
            conn = connect_db(params_dict)
            
            execute_batch_insertion(
                conn,
                min_datetime_aqm, max_datetime_aqm,
                df_hourly_aqm.round(4), aqm_fieldnames, aqm_tablename,
                station_id,
                1
            )            
        else:
            logger.error("Filetype Mismatched!")


In [273]:
load_to_db('Collie_Raw_Data_20200719.csv', metadata, 'dwer_met', 'dwer_aqm')

In [181]:
def lambda_handler(event, context):
    filename = event['Records'][0]['s3']['object']['key']
    bucket = event['Records'][0]['s3']['bucket']['name']
    
    csv_file = filename.split('/')[-1]
    file_path = '/tmp/{}'.format(csv_file)
    
    try:
        s3_client.download_file(bucket, filename, file_path)
        logger.debug(f"Successfully downloaded {csv_file} from s3")
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            logger.error("The object does not exist")
    else:
        logger.debug(f"Started loading {csv_file} data")
        
        load_to_db(file_path, metadata, 'dwer_met', 'dwer_aqm')
    finally:
        logger.info("Completed!")