# Cobalt Class and Function Definitions
* Define classes and functions needed for analysis and reporting

In [1]:
# Modules
from cobalt_parameters import *

import pandas as pd
import numpy as np

import re
import datetime
from collections import defaultdict

import glob
import os
import psycopg2
from sqlalchemy import create_engine

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns; 

# Class Definitions

In [2]:
class color:
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'

# Function Definitions

In [3]:
def database_connect():
    # Read-only database connection
    conn = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port= '5432')
    conn.autocommit = True
    cursor = conn.cursor()

    # Reporting database connection
    reporting_conn = psycopg2.connect(database=DB_REPORTING_NAME, user=DB_REPORTING_USER, password=DB_REPORTING_PASSWORD, host=DB_REPORTING_HOST, port= '5432')
    reporting_conn.autocommit = True
    reporting_cursor = reporting_conn.cursor()
    
    return cursor,reporting_cursor

In [4]:
def get_table_data(db_cursor, table_name, limit='ALL'):
    # check user input - allows string or numeric input (may change to string only to simpllify error checking)
    if type(limit) == str: 
        if limit != 'ALL':
            try:
                int(limit)
            except:
                print("Please enter an integer value for limit.")
                return
    elif type(limit) == int:
        limit = str(limit)
    else:
        print("Please enter an integer value for limit.")
        return
    
    query = """SELECT * FROM """ + table_name + """ LIMIT """ + limit + """;"""
    db_cursor.execute(query)
    result = db_cursor.fetchall()
    colnames = [desc[0] for desc in db_cursor.description]
    dataframe = pd.DataFrame(result, columns=colnames)
    
    return dataframe

In [5]:
def get_date_str(date, formatted=False):
    year = str(date.year)
    month = str(date.month)
    if len(month)==1:
        month = '0'+month
    day = str(date.day)
    if len(day)==1:
        day = '0'+day
        
    if formatted:
        date = year+'/'+month+'/'+day
    else:
        date = year+month+day
    return date

In [6]:
def get_ts_xlabels(index, time): 
    xlabels = []
    if type(index) == pd.MultiIndex:
        if time == 'weekly':
            for item in index:
                year = str(item[0])
                month = str(item[1])
                if len(month) == 1:
                    month = '0' + month
                day = str(item[2])
                if len(day) == 1:
                    day = '0' + day
                xlabels.append(year + '-' + month + '-' + day)
        if time == 'monthly':
            xlabels = [month_dict[item[1]] + '\n' + str(item[0]) for item in index]
    return xlabels

In [7]:
def get_appointment_heatmap(data, apt_type, grouping, date_col, id_col, date_offset=0, save_fig=False):
    day_names = ['Monday','Tuesday','Wednesday','Thursday','Friday','Saturday','Sunday',]
    day_names_dict = dict({0:'Monday',1:'Tuesday',2:'Wednesday',3:'Thursday',4:'Friday',5:'Saturday',6:'Sunday'})
    
    # Set date range and filename
    if date_offset < 0: # past
        data = data[(data[date_col] >= pd.to_datetime(datetime.datetime.now() + pd.Timedelta(days=date_offset)).tz_localize(tz='US/Eastern')) & 
                    (data[date_col] <= pd.to_datetime(datetime.datetime.now()).tz_localize(tz='US/Eastern'))]
        filename = apt_type + '_appointment_' + grouping + '_last' + str(abs(date_offset)) + 'days_'
        offset_title = 'Last ' + str(abs(date_offset)) + ' Days'
    elif date_offset > 0: # future
        data = data[(data[date_col] <= pd.to_datetime(datetime.datetime.now() + pd.Timedelta(days=date_offset)).tz_localize(tz='US/Eastern')) & 
                    (data[date_col] >= pd.to_datetime(datetime.datetime.now()).tz_localize(tz='US/Eastern'))]
        filename = apt_type + '_appointment_' + grouping + '_next' + str(date_offset) + 'days_'
        offset_title = 'Next ' + str(date_offset) + ' Days'
    else: 
        filename = apt_type + '_appointment_' + grouping + '_allTime_'
        offset_title = 'All Time'
    
    # Prep figure data and labels
    data = data.groupby([grouping,'support_role_id']).count()[id_col]
    data = data.unstack().fillna(0)
    data = data.transpose()
    
    if grouping == 'dayofweek':
        data.columns = [day_names_dict[item] for item in data.columns]
        grouping_title = 'Day of Week'
    elif grouping == 'hourofday':
        grouping_title = 'Hour of Day'
        
    # Plot
    fig, ax = plt.subplots(figsize=(16,8)) 
    sns.heatmap(data, annot=True, linewidths=.5, ax=ax)
    ax.set_title(apt_type.capitalize() + ' Appointments by ' + grouping_title + ': ' + offset_title, fontsize=18)
    
    if save_fig:
        save_figure(fig, FIGURE_PATH, filename)

In [8]:
def save_figure(fig, path, filename):
    name = path + filename + str(datetime.datetime.now().date()).replace('-','') + '.png'
    fig.savefig(name, bbox_inches='tight', pad_inches=0, dpi=SAVE_DPI, transparent=True)

In [9]:
def get_appt_provider_role(row):
    ambiguous_appt_names = ['Initial Visit','Followup Visit','Return Visit']
    ambiguous_appt_names_roles = ['CLINICIAN','PSYCHIATRIST']
    appt_type_name = row['appointment_type_name']
    provider_id = row['provider_id']
    
    if appt_type_name in appt_provider_role_dict.keys():
        appt_role = appt_provider_role_dict[appt_type_name]
    elif appt_type_name in ambiguous_appt_names:
        appt_role = provider_support_role[(provider_support_role['provider_id']==provider_id) & 
                                          (provider_support_role['support_role_id'].isin(ambiguous_appt_names_roles))].support_role_id.values[0]
    else:
        appt_role = 'UNDEFINED'
    
    
    return appt_role

In [10]:
def get_appt_provider_role_df(data, provider_support_role):    
    mult_role = provider_support_role.groupby(['provider_id']).filter(lambda x: len(x)>1)
    mult_role_provID = mult_role.provider_id.unique()

    appt_single_role = data[~data['provider_id'].isin(mult_role_provID)].copy()
    appt_mult_role = data[data['provider_id'].isin(mult_role_provID)].copy()

    appt_single_role = appt_single_role.merge(provider_support_role, how='inner', left_on='provider_id', right_on='provider_id')
    appt_mult_role['support_role_id'] = appt_mult_role.apply(lambda x: get_appt_provider_role(x), axis=1)

    data = pd.concat([appt_single_role,appt_mult_role]).sort_index()

    return data