Links to other notebooks in the same folder:
<a href='http://pivotal.io/data-science'><img src='https://raw.githubusercontent.com/crawles/Logos/master/Pivotal_TealOnWhite.png' width='200px' align='right'></a>

<nav class = "navbar navbar-light bg-faded">
    <ul class = "nav navbar-nav">
        <li class = "">
            <a class = "nav-link">notebook1</a>
        </li>
        <li class = "">
            <a class = "nav-link">notebook2</a>
        </li>
        <li class = "">
            <a class = "nav-link">notebook3</a>
        </li>
        

# Table of Contents
 <p><div class="lev1 toc-item"><a href="#Import-Useful-Libraries" data-toc-modified-id="Import-Useful-Libraries-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Import Useful Libraries</a></div><div class="lev1 toc-item"><a href="#Connect-to-Database" data-toc-modified-id="Connect-to-Database-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Connect to Database</a></div><div class="lev1 toc-item"><a href="#Autofill-Table-Names" data-toc-modified-id="Autofill-Table-Names-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Autofill Table Names</a></div><div class="lev1 toc-item"><a href="#Thread-Manager" data-toc-modified-id="Thread-Manager-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Thread Manager</a></div><div class="lev1 toc-item"><a href="#Magic-Functions-Useful-for-Interacting-with-the-Cluster" data-toc-modified-id="Magic-Functions-Useful-for-Interacting-with-the-Cluster-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Magic Functions Useful for Interacting with the Cluster</a></div>

# Import Useful Libraries

In [1]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
from __future__ import division
import getopt
from itertools import izip
import os
import re
import sys
from textwrap import dedent
import threading
import time
import urllib

from IPython.core.display import display, HTML
from IPython.core.magic import register_cell_magic,\
                               register_line_cell_magic,\
                               register_line_magic
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pandas.io.sql as psql
import psycopg2
import seaborn as sns

# Credentials file to connect to database
import credentials
from mpp_plotting_functions import *
from sql_functions import *

In [2]:
# Changes logo to a Pivotal logo
jPrefs = urllib.urlopen("https://raw.githubusercontent.com/crawles/Logos/master/jupyterPrefs.js").read()
HTML('<script>{}</script>'.format(jPrefs))

In [3]:
# Set default cell width
display(HTML('<style>.container {width:80% !important;}</style>'))

# Set default matplotlib settings
plt.rcParams['figure.figsize'] = (10, 7)
plt.rcParams['lines.linewidth'] = 3
plt.rcParams['figure.titlesize'] = 26
plt.rcParams['axes.labelsize'] = 18
plt.rcParams['axes.titlesize'] = 22
plt.rcParams['xtick.labelsize'] = 14
plt.rcParams['ytick.labelsize'] = 14
plt.rcParams['legend.fontsize'] = 16

# Set seaborn colours
sns.set_palette('colorblind')
blue, green, red, purple, yellow, cyan = sns.color_palette('colorblind')

# Connect to Database

In [4]:
conn = psycopg2.connect(**credentials.login_info_dict)
conn.autocommit = True

# Set the schema name
schema_name = 'template'
psql.execute('SET search_path TO {}'.format(schema_name), conn)

<cursor object at 0x117864e50; closed: 0>

# Autofill Table Names
One downside of interacting with a remote database in Python is that table names will not be imported in. When this is run, it will import all of the schema names as classes and their respective table names as variables. That way, when we type a schema name, we can use tab completion to list out all of its columns.

In [5]:
class Schema:
    def __init__(self, tables):
        for t in tables:
            exec('self.{t} = "{t}"'.format(t=t))

def refresh_tables(conn):
    """Refreshes the auto-fill tables."""
    sql = '''
    SELECT table_schema, array_agg(table_name::TEXT) AS tables
      FROM information_schema.tables
     GROUP BY table_schema;
    '''
    info_df = psql.read_sql(sql, conn)

    for row in info_df.iterrows():
        command = '''
        global {s}
        {s} = Schema({tables})
        '''.format(s=row[1][0], tables=row[1][1])
        exec(dedent(command))

# Thread Manager
This class provides a framework to manage concurrent threads. The `%%background` magic function will automatically call the `ThreadManager` when opening and close threads. We can use the `ThreadManager` to view all current and past threads including start and finish times and any comments associated to the threads.

In [6]:
class ThreadManager:
    def __call__(self, num_rows=5):
        """Returns the num_rows most recent threads (Default: 5)"""
        return self.thread_df.tail(num_rows).iloc[::-1]
    
    def __init__(self):
        df_cols = ['start_time', 'finish_time', 'exec_time',
                   'cell_text', 'comment']
        self.thread_df = pd.DataFrame(columns=df_cols)
        
    def add_thread(self, cell_text, comment='N/A'):
        thread_id = self.get_next_thread_id()
        start_time = datetime.now()
        self.thread_df.loc[thread_id] = [start_time, '', '', cell_text, comment]
        print 'Started Thread {} at {}.\nComment: {}'\
            .format(thread_id, start_time, comment)
        
    def finish_thread(self, thread_id):
        if self.thread_df.loc[thread_id, 'finish_time'] == '':
            # Set finish time
            finish_time = datetime.now()
            self.thread_df.loc[thread_id, 'finish_time'] = finish_time
            
            # Set execution time
            exec_time = finish_time - self.thread_df.loc[thread_id, 'start_time']
            self.thread_df.loc[thread_id, 'exec_time'] = exec_time
            
            # Print comment
            comment = str(self.thread_df.loc[thread_id, 'comment'])
            print 'Finished Thread {} at {}.\nDone in {}.\nComment: {}'\
                .format(thread_id, finish_time, exec_time, comment)
                        
        else:
            raise Exception('Cannot finish an already completed thread.')
        
    def raise_thread_error(self, thread_id, error_message):
        if self.thread_df.loc[thread_id, 'finish_time'] == '':
            self.thread_df.loc[thread_id, 'finish_time'] = 'Exception: {}'.format(error_message)
            self.thread_df.loc[thread_id, 'exec_time'] = 'Exception: {}'.format(error_message)
        
    def get_next_thread_id(self):
        return self.thread_df.shape[0]
    
    def get_finished_threads(self):
        return self.thread_df[self.thread_df['finish_time'] != '']

    def get_unfinished_threads(self):
        return self.thread_df[self.thread_df['finish_time'] == '']
    
thread_manager = ThreadManager()

# Magic Functions Useful for Interacting with the Cluster
These functions allow us to type write and run raw SQL a cell with the magic function at the top.

In [7]:
@register_cell_magic
def readsql(line, cell):
    """
    Extract the code in the specific cell (should be valid SQL), and 
    execute it using the connection object to the backend  database.
    The resulting pandas DataFrame is rendered inline  below the cell
    using IPython.display. You'd use this for SELECT.
    
    Returns a DataFrame with the name specified in the magic function.
    If this is not specified, then the DataFrame is called _df. This
    also takes in an option "-h", followed by a number. This will show
    only the specified number of rows in the DataFrame.
    """
    
    # Use the global connection object defined above.
    global conn
    optlist, args = getopt.getopt(line.split(), 'ih:')
    optdict = dict(optlist) 
    # If '-h' tag is specified, set the number of rows to display
    if '-h' in optdict:
        head_num = int(optdict['-h'])
    
    # Do string formatting. If a PL/Python function
    # is being created, then it should not try and
    # format whatever is inside the function.
    split_cell = cell.split('$')
    if '-i' not in optdict:
        if len(split_cell) > 1:
            split_cell[0] = split_cell[0].format(**globals())
            split_cell[-1] = split_cell[-1].format(**globals())
            cell = '$'.join(split_cell)
        elif len(split_cell) == 1:
            cell = cell.format(**globals()) 
    
    # If there is more than one table name specified,
    # throw an exception.
    if len(args) > 1:
        raise Exception('More than one table name specified.')

    elif len(args) == 1:
        # If a table name is specified, store it as that
        table_name = args[0]
        globals()[table_name] = psql.read_sql(cell, conn)
        if '-h' in optdict:
            # If head_num is not 0, then display rows
            if head_num != 0:
                display(globals()[table_name].head(int(optdict['-h'])))
        else:
            display(globals()[table_name])

    else:
        # Otherwise, call it _df
        global _df
        _df = psql.read_sql(cell, conn)
        if '-h' in optdict:
            # If head_num is not 0, then display rows
            if head_num != 0:
                display(_df.head(head_num))
        else:
            display(_df)
            
    refresh_tables(conn)

@register_cell_magic
def execsql(line, cell):
    """
    Extract the code in the specific cell (should be valid SQL), and
    execute it using the connection object to the backend  database.
    You'd use this for CREATE/UPDATE/DELETE.
    """
    
    # Use the global connection object defined above.
    global conn
    optlist, args = getopt.getopt(line.split(), 'ih:')
    optdict = dict(optlist)
    
    # Do string formatting. If a PL/Python function
    # is being created, then it should not try and
    # format whatever is inside the function.
    split_cell = cell.split('$')
    if '-i' not in optdict:
        if len(split_cell) > 1:
            split_cell[0] = split_cell[0].format(**globals())
            split_cell[-1] = split_cell[-1].format(**globals())
            cell = '$'.join(split_cell)
        elif len(split_cell) == 1:
            cell = cell.format(**globals())
    psql.execute(cell, conn)
    refresh_tables(conn)

@register_cell_magic
def printsql(line, cell):
    """Show the SQL query that will be run."""
    
    optlist, args = getopt.getopt(line.split(), 'ih:')
    optdict = dict(optlist)
    
    # Do string formatting. If a PL/Python function
    # is being created, then it should not try and
    # format whatever is inside the function.
    split_cell = cell.split('$')
    if '-i' not in optdict:
        if len(split_cell) > 1:
            split_cell[0] = split_cell[0].format(**globals())
            split_cell[-1] = split_cell[-1].format(**globals())
            cell = '$'.join(split_cell)
        elif len(split_cell) == 1:
            cell = cell.format(**globals())
    print cell
    
@register_cell_magic
def background(line, cell):
    """Runs whatever is in the cell in a separate thread. This allows
    the user to run cells in the background so that additional cells
    can be run concurrently. This will also micromanage by labelling 
    each thread with an id number.
    
    Whatever follows after specifying '%%background' will be used as a
    comment to label the process if the id number is not descriptive
    enough.
    """
    def is_useful_code(code):
        """Returns True if code is useful to use. Code that is
        considered not useful is code that contains only commented
        lines or empty lines."""

        lines = code.split('\n')

        # For each line, check if line equals '' or starts with a '#'
        return not np.all([line == '' or line[0] == '#' for line in lines])
    
    def run_cell(cell_value, line_value, thread_id):
        try:
            exec cell_value in globals()
        except Exception as error_message:
            thread_manager.raise_thread_error(thread_id, error_message)
            raise Exception(error_message)
            
        thread_manager.finish_thread(thread_id)
        
    # Splits the code into separate threads by lines that start with
    # two or more '#' characters. 
    cell_list = re.split('(?:^#{2,}|\n#{2,}).*\n', cell)
    cell_list = [sub_cell
                 for sub_cell in cell_list
                 if is_useful_code(sub_cell)]
    
    # Find comment fields, which are defined by lines with two or more
    # '#' characters, spaces, any character (except '#' and '\n'), then
    # a terminating '\n'.
    comment_list = re.findall('(^#{2,}|\n#{2,})(\s)*([^#\n]+)\n', cell)
    comment_list = [x[2] for x in comment_list]

    for sub_cell, sub_comment in izip(cell_list, comment_list):
        thread_id = thread_manager.get_next_thread_id()

        # Add thread to thread manager
        if len(line) > 0:
            # Format final thread comment
            thread_comment = '{} - {}'.format(line, sub_comment)
            thread_manager.add_thread(cell_text=sub_cell,
                                      comment=thread_comment)
        else:
            thread_manager.add_thread(cell_text=sub_cell)

        # Run the thread in the background
        thread = threading.Thread(target=run_cell,
                                  args=(sub_cell, line, thread_id)
                                 )
        thread.start()

# We delete these to avoid name conflicts for automagic to work
del execsql, readsql, printsql, background