 # Table of Contents
<div class="toc" style="margin-top: 1em;"><ul class="toc-item" id="toc-level0"></ul></div>

In [1]:
from datetime import date, datetime, time
from dateutil.relativedelta import relativedelta
import getopt
from itertools import chain, izip, izip_longest, product
import os
import re
import sys
from textwrap import dedent
import threading
import time
import urllib

import graphviz
from IPython.core.display import display, Image, HTML
from IPython.core.magic import register_cell_magic,\
                               register_line_cell_magic,\
                               register_line_magic
        
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import pandas.io.sql as psql
import psycopg2
import seaborn as sns
from sqlalchemy import create_engine, Column, MetaData, Table
from sqlalchemy import all_, and_, any_, not_, or_
from sqlalchemy import alias, between, case, cast, column, false, func,\
                       intersect, literal, literal_column, select, text, true
from sqlalchemy import BigInteger, Boolean, Date, DateTime, Integer, Interval, Float,\
                       Numeric, String

import credentials
from mpp_plotting import *
from sql_functions import *

In [2]:
sns.set_palette('colorblind')

In [3]:
# Psycopg2 connection
conn = psycopg2.connect(**credentials.login_info_dict)
conn.autocommit = True

# SQLAlchemy connection
engine_str = 'postgresql://{user}:{password}@{host}/{dbname}'\
    .format(**credentials.login_info_dict)
engine = create_engine(engine_str)

metadata = MetaData(engine)

In [4]:
class ThreadManager:
    def __call__(self, num_rows=5):
        """Returns the num_rows most recent threads (Default: 5)"""
        
        # Update exec time if thread has yet to finish
        for ix in self.thread_df.index:
            if self.thread_df.loc[ix, 'finish_time'] == '':
                start_time = self.thread_df.loc[ix, 'start_time']
                time_elapsed = datetime.now() - start_time
                
                self.thread_df.loc[ix, 'exec_time'] = time_elapsed
        return self.thread_df.tail(num_rows).iloc[::-1]
    
    def __init__(self):
        df_cols = ['start_time', 'finish_time', 'exec_time',
                   'cell_text', 'comment', 'error_message']
        self.thread_df = pd.DataFrame(columns=df_cols)
        
    def _add_finish_times(self, thread_id):
        """Adds the finish time and exec time to thread_df.
        
        Returns: a tuple of finish time, exec time
        """
        
        # Set finish time
        finish_time = datetime.now()
        self.thread_df.loc[thread_id, 'finish_time'] = finish_time

        # Set execution time
        exec_time = finish_time - self.thread_df.loc[thread_id, 'start_time']
        self.thread_df.loc[thread_id, 'exec_time'] = exec_time
        
        return finish_time, exec_time
    
    def get_error_threads(self):
        """Returns a DataFrame of threads which threw an error."""
        return self.thread_df[self.thread_df['error_message'] != '']
    
    def get_finished_threads(self):
        """Returns a DataFrame of the finished threads."""
        return self.thread_df[self.thread_df['finish_time'] != '']
        
    def get_next_thread_id(self):
        """Returns an integer representing the ID for the next thread."""
        return self.thread_df.shape[0]

    def get_unfinished_threads(self):
        """Returns a DataFrame of the unfinished threads."""
        return self.thread_df[self.thread_df['finish_time'] == '']
    
    def add_thread(self, cell_text, comment='N/A'):
        thread_id = self.get_next_thread_id()
        start_time = datetime.now()
        
        self.thread_df.loc[thread_id] = [start_time, '', '', cell_text, comment, '']
        print 'Started Thread {} at {}.\nComment: {}'\
            .format(thread_id, start_time, comment)
        
    def finish_thread(self, thread_id):
        if self.thread_df.loc[thread_id, 'finish_time'] == '':
            finish_time, exec_time = self._add_finish_times(thread_id)
            
            # Print comment
            comment = str(self.thread_df.loc[thread_id, 'comment'])
            print 'Finished Thread {} at {}.\nDone in {}.\nComment: {}'\
                .format(thread_id, finish_time, exec_time, comment)
                        
        else:
            raise Exception('Cannot finish an already completed thread.')
        
    def raise_thread_error(self, thread_id, error_message):
        if self.thread_df.loc[thread_id, 'finish_time'] == '':
            exception_message = 'Exception: {}'.format(error_message)
            self._add_finish_times(thread_id)            
            self.thread_df.loc[thread_id, 'error_message'] = exception_message
        
thread_manager = ThreadManager()

In [5]:
@register_cell_magic
def readsql(line, cell):
    """
    Extract the code in the specific cell (should be valid SQL), and 
    execute it using the connection object to the backend  database.
    The resulting pandas DataFrame is rendered inline  below the cell
    using IPython.display. You'd use this for SELECT.
    
    Returns a DataFrame with the name specified in the magic function.
    If this is not specified, then the DataFrame is called _df. This
    also takes in an option "-h", followed by a number. This will show
    only the specified number of rows in the DataFrame.
    """
    
    # Use the global connection object defined above.
    global conn
    optlist, args = getopt.getopt(line.split(), 'ih:')
    optdict = dict(optlist) 
    # If '-h' tag is specified, set the number of rows to display
    if '-h' in optdict:
        head_num = int(optdict['-h'])
    
    # Do string formatting. If a PL/Python function
    # is being created, then it should not try and
    # format whatever is inside the function.
    split_cell = cell.split('$')
    if '-i' not in optdict:
        if len(split_cell) > 1:
            split_cell[0] = split_cell[0].format(**globals())
            split_cell[-1] = split_cell[-1].format(**globals())
            cell = '$'.join(split_cell)
        elif len(split_cell) == 1:
            cell = cell.format(**globals()) 
    
    # If there is more than one table name specified,
    # throw an exception.
    if len(args) > 1:
        raise Exception('More than one table name specified.')

    elif len(args) == 1:
        # If a table name is specified, store it as that
        table_name = args[0]
        globals()[table_name] = psql.read_sql(cell, conn)
        if '-h' in optdict:
            # If head_num is not 0, then display rows
            if head_num != 0:
                display(globals()[table_name].head(int(optdict['-h'])))
        else:
            display(globals()[table_name])

    else:
        # Otherwise, call it _df
        global _df
        _df = psql.read_sql(cell, conn)
        if '-h' in optdict:
            # If head_num is not 0, then display rows
            if head_num != 0:
                display(_df.head(head_num))
        else:
            display(_df)
            
    refresh_tables(conn)

@register_cell_magic
def execsql(line, cell):
    """
    Extract the code in the specific cell (should be valid SQL), and
    execute it using the connection object to the backend  database.
    You'd use this for CREATE/UPDATE/DELETE.
    """
    
    # Use the global connection object defined above.
    global conn
    optlist, args = getopt.getopt(line.split(), 'ih:')
    optdict = dict(optlist)
    
    # Do string formatting. If a PL/Python function
    # is being created, then it should not try and
    # format whatever is inside the function.
    split_cell = cell.split('$')
    if '-i' not in optdict:
        if len(split_cell) > 1:
            split_cell[0] = split_cell[0].format(**globals())
            split_cell[-1] = split_cell[-1].format(**globals())
            cell = '$'.join(split_cell)
        elif len(split_cell) == 1:
            cell = cell.format(**globals())
    psql.execute(cell, conn)
    refresh_tables(conn)

@register_cell_magic
def printsql(line, cell):
    """Show the SQL query that will be run."""
    
    optlist, args = getopt.getopt(line.split(), 'ih:')
    optdict = dict(optlist)
    
    # Do string formatting. If a PL/Python function
    # is being created, then it should not try and
    # format whatever is inside the function.
    split_cell = cell.split('$')
    if '-i' not in optdict:
        if len(split_cell) > 1:
            split_cell[0] = split_cell[0].format(**globals())
            split_cell[-1] = split_cell[-1].format(**globals())
            cell = '$'.join(split_cell)
        elif len(split_cell) == 1:
            cell = cell.format(**globals())
    print cell
    
@register_cell_magic
def background(line, cell):
    """Runs whatever is in the cell in a separate thread. This allows
    the user to run cells in the background so that additional cells
    can be run concurrently. This will also micromanage by labelling 
    each thread with an id number.
    
    Whatever follows after specifying '%%background' will be used as a
    comment to label the process if the id number is not descriptive
    enough.
    """
    
    def is_useful_code(code):
        """Returns True if code is useful to use. Code that is
        considered not useful is code that contains only commented
        lines or empty lines."""

        lines = code.split('\n')

        # For each line, check if line equals '' or starts with a '#'
        return not np.all([line == '' or line[0] == '#' for line in lines])
    
    def run_cell(cell_value, line_value, thread_id):
        try:
            exec cell_value in globals()
        except Exception as error_message:
            thread_manager.raise_thread_error(thread_id, error_message)
            raise Exception(error_message)
            
        thread_manager.finish_thread(thread_id)
        
    def get_thread_comment(line, sub_comment):
        """Gets the final thread comment by looking at the main comment
        line and the sub-comments."""
        
        # Has main comment if the length of the line is greater than 0
        # or if the line is not all spaces
        has_main_comment = len(line) > 0 and not bool(re.match('^( )+$', line))
        
        # Has sub comment if it is not None and if it is not equal to
        # the blank string (The regex will handle cases of all spaces)
        has_sub_comment = sub_comment is not None and sub_comment != ''
        
        if has_main_comment:
            if has_sub_comment:
                return '{} - {}'.format(line, sub_comment)
            else:
                return line
                
        else:
            if has_sub_comment:
                return sub_comment
            else:
                return 'N/A'
        
    # Splits the code into separate threads by lines that start with
    # two or more '#' characters. 
    cell_list = re.split('(?:^#{2,}|\n#{2,}).*\n', cell)
    cell_list = [sub_cell
                     for sub_cell in cell_list
                         if is_useful_code(sub_cell)]
    
    # Find comment fields, which are defined by lines with two or more
    # '#' characters, spaces, any character (except '#' and '\n'), then
    # a terminating '\n'. Use non-capturing group to ignore '#'
    # characters and spaces before the comment.
    comment_list = re.findall('(?:^#{2,}|\n#{2,})(?: )*([^#\n]*)\n', cell)

    # If there is no break above the first one, comment list will be
    # smaller than cell list, and it will be offset by one.
    if len(comment_list) < len(cell_list):
        comment_list.insert(0, None)
        
    for sub_cell, sub_comment in izip_longest(cell_list, comment_list):
        thread_id = thread_manager.get_next_thread_id()

        # Add thread to thread manager
        thread_comment = get_thread_comment(line, sub_comment)
    
        thread_manager.add_thread(cell_text=sub_cell,
                                  comment=thread_comment
                                 )

        # Run the thread in the background
        thread = threading.Thread(target=run_cell,
                                  args=(sub_cell, line, thread_id)
                                 )
        thread.start()

# We delete these to avoid name conflicts for automagic to work
del execsql, readsql, printsql, background

In [None]:
sql = '''
DROP TABLE IF EXISTS test;
CREATE TABLE test
   AS SELECT random()^2 AS x,
             random()^2 AS y,
             'test_' || row_number() OVER () AS txt_col,
             CASE WHEN row_number() OVER () < 4000 THEN 'first'
                  ELSE 'second'
              END AS category,
             '2017-01-01'::DATE + INTERVAL '1 DAY' * row_number() OVER () AS date_col
        FROM generate_series(1, 10000);
        
DROP TABLE IF EXISTS test_2;
CREATE TABLE test_2
   AS SELECT random()^2 AS x,
             random()^2 AS y,
             'test_' || row_number() OVER () AS txt_col,
             CASE WHEN row_number() OVER () < 4000 THEN 'first'
                  ELSE 'second'
              END AS category,
             '2017-01-01'::DATE + INTERVAL '1 DAY' * row_number() OVER () AS date_col
        FROM generate_series(1, 10000);
'''
psql.execute(sql, conn)

In [None]:
sql = '''
DROP TABLE IF EXISTS test_date;
CREATE TABLE test_date
   AS SELECT (days * INTERVAL '1 DAY' + '2017-01-01'::DATE) AS date_col
        FROM (SELECT UNNEST(ARRAY[1, 1, 1, 1, 2, 2, 3, 3, 4, 5, 5, 5, 5]) AS days) AS foo
'''
psql.execute(sql, conn)

In [None]:
test = Table('test', metadata, autoload=True, schema='public')
test_2 = Table('test_2', metadata, autoload=True, schema='public')
test_date = Table('test_date', metadata, autoload=True, schema='public')
info_cols = Table('columns', metadata, autoload=True,
                  schema='information_schema')

In [None]:
hist_df = get_histogram_values('test', 'x', engine)
display(hist_df.head())
plot_numeric_hists(hist_df)

In [None]:
hist_df = get_histogram_values(test, 'x', engine)
display(hist_df.head())
plot_numeric_hists(hist_df)

In [None]:
hist_df = get_histogram_values(test, 'category', engine)
display(hist_df.head())
plot_categorical_hists([hist_df, hist_df])

In [None]:
hist_df\
    .merge(hist_df, on='category')\
    .set_index('category')\
    .plot(kind='bar')

In [None]:
fig, ax = plt.subplots()
hist_df\
    .set_index('category')\
    .plot(kind='bar', ax=ax)
hist_df\
    .set_index('category')\
    .plot(kind='bar', ax=ax)

In [None]:
hist_df = get_histogram_values(test_date, 'date_col', engine)
display(hist_df.head())
plot_date_hists(hist_df)

In [None]:
scatter_df = get_scatterplot_values(test, 'x', 'y', engine, nbins=(50, 50))

In [None]:
plot_scatterplot(scatter_df)

In [None]:
plot_scatterplot(scatter_df, plot_type='heatmap')
plt.colorbar()

In [9]:
sql = '''
DROP TABLE IF EXISTS test;
CREATE TABLE test
   AS SELECT generate_series(0, 10) AS id;
   
DROP TABLE IF EXISTS test_2;
CREATE TABLE test_2
   AS SELECT generate_series(5, 15) AS id;
'''
psql.execute(sql, conn)

<cursor object at 0x10fcdfc50; closed: 0>

In [10]:
metadata = MetaData(engine)

In [11]:
test = Table('test', metadata, autoload=True, schema='public')
test_2 = Table('test_2', metadata, autoload=True, schema='public')

In [17]:
sql = select([test.c.id, test_2.c.id], 
             from_obj=test.join(test_2, onclause=(test.c.id == test_2.c.id))
            )
    
print sql
psql.read_sql(sql, engine)

SELECT public.test.id, public.test_2.id 
FROM public.test JOIN public.test_2 ON public.test.id = public.test_2.id 
 LIMIT %(param_1)s


Unnamed: 0,id,id.1
0,5,5
1,6,6
2,7,7
3,8,8
4,9,9
5,10,10


In [21]:
test_ss = select([test.c.id]).alias('foo')
test_ss_2 = select([test_2.c.id]).alias('foo_2')

In [22]:
sql = select([test_ss.c.id, test_ss_2.c.id], 
             from_obj=test_ss.join(test_ss_2, onclause=(test_ss.c.id == test_ss_2.c.id))
            )
print sql
psql.read_sql(sql, engine)

SELECT foo.id, foo_2.id 
FROM (SELECT public.test.id AS id 
FROM public.test) AS foo JOIN (SELECT public.test_2.id AS id 
FROM public.test_2) AS foo_2 ON foo.id = foo_2.id


Unnamed: 0,id,id.1
0,5,5
1,6,6
2,7,7
3,8,8
4,9,9
5,10,10


In [23]:
join_table = test_ss.join(test_ss_2, onclause=(test_ss.c.id == test_ss_2.c.id))
sql = select([test_ss.c.id, test_ss_2.c.id], 
             from_obj=join_table
            )
print sql
psql.read_sql(sql, engine)

SELECT foo.id, foo_2.id 
FROM (SELECT public.test.id AS id 
FROM public.test) AS foo JOIN (SELECT public.test_2.id AS id 
FROM public.test_2) AS foo_2 ON foo.id = foo_2.id


Unnamed: 0,id,id.1
0,5,5
1,6,6
2,7,7
3,8,8
4,9,9
5,10,10


In [24]:
join_table = test_ss.join(test_2, onclause=(test_ss.c.id == test_2.c.id))
sql = select([test_ss.c.id, test_2.c.id], 
             from_obj=join_table
            )
print sql
psql.read_sql(sql, engine)

SELECT foo.id, public.test_2.id 
FROM (SELECT public.test.id AS id 
FROM public.test) AS foo JOIN public.test_2 ON foo.id = public.test_2.id


Unnamed: 0,id,id.1
0,5,5
1,6,6
2,7,7
3,8,8
4,9,9
5,10,10


In [26]:
join_table = test_ss.join(test_2, onclause=(test_ss.c.id == test_2.c.id))
sql = select([test_ss.c.id, test_2.c.id])\
    .select_from(join_table)

print sql
psql.read_sql(sql, engine)

SELECT foo.id, public.test_2.id 
FROM (SELECT public.test.id AS id 
FROM public.test) AS foo JOIN public.test_2 ON foo.id = public.test_2.id


Unnamed: 0,id,id.1
0,5,5
1,6,6
2,7,7
3,8,8
4,9,9
5,10,10
