In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
from IPython.display import display, HTML
import MySQLdb
import datetime
import re
import csv
import subprocess, os

## Introduction

This notebook creates the data that feeds into the different models used for linking SED to UMETRICS.

We start with the tables of starmetrics database, which are updated outside of the scope of this project. Our first task is to create the employee_transactions table. This employee transaction table will feed into a python function that creates an input table for both Fellegi-Sunter training data model and the Random Forest models.



In [1]:
# This function takes input from Starmetrics table and creates a table that serves as input to the Random forest program
# It is faster to do this directly in mySQL, so this script should be run in sql command line whenever possible.

def createEmployeeTransaction(db, empTableName):
    db.engine.execute(text("drop table if exists starmetricsnew.temp1;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.temp2;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.temp3;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew."+empTableName+";"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.awardtemp;"), autocommit = True)

        
    db.engine.execute(text("""CREATE TABLE starmetricsnew.temp1 AS SELECT e.id AS employee_transaction_id,
    periodstartdate AS period_start_date,
    periodenddate AS period_end_date,
    ftestatus AS fte_status,
    proportionofearnings AS proportion_of_earnings,
    recipientaccountnumber AS recipient_account_number,
    employeeid AS x_employee_id,
    uniqueawardnumber AS x_unique_award_number,
    occupation_orig AS x_occupational_classification,
    cfda AS x_cfda,
    university FROM
    starmetrics.employee e;"""), autocommit = True)
    db.engine.execute(text("""ALTER TABLE `starmetricsnew`.`temp1` 
                    ADD INDEX `uniq` (`x_unique_award_number` ASC)  COMMENT '';"""), autocommit = True)
    
    db.engine.execute(text("""Create table starmetricsnew.awardtemp AS
    Select  __award_id, unique_award_number from starmetricsnew.award
    group by unique_award_number;"""), autocommit = True)
    db.engine.execute(text("""ALTER TABLE `starmetricsnew`.`awardtemp` 
    ADD INDEX `uniq` (`unique_award_number` ASC)  COMMENT '';"""), autocommit = True)
    
    db.engine.execute(text("""CREATE TABLE starmetricsnew.temp2 AS SELECT t1.*, c.__award_id FROM
    starmetricsnew.temp1 t1
        LEFT JOIN
    starmetricsnew.awardtemp c ON c.unique_award_number = t1.x_unique_award_number;"""), autocommit = True)
    db.engine.execute(text("""ALTER TABLE `starmetricsnew`.`temp2` 
    ADD INDEX `emp` (`x_employee_id` ASC, `university` ASC)  COMMENT '';"""), autocommit = True)
    
    db.engine.execute(text("""CREATE TABLE starmetricsnew.temp3 AS SELECT t2.*, ex.__employee_id FROM
    starmetricsnew.temp2 t2
        INNER JOIN
    starmetrics.employeeid_xwalk ex ON ex.employeeid = t2.x_employee_id
        AND ex.university = t2.university;"""), autocommit = True)
    db.engine.execute(text("""ALTER TABLE `starmetricsnew`.`temp3` 
    ADD INDEX `occ` (`x_occupational_classification` ASC, `university` ASC)  COMMENT '';"""), autocommit = True)
    
    db.engine.execute(text("""CREATE TABLE starmetricsnew."""+ empTableName+ """ AS SELECT t3.*, o.occupationalclassification FROM
    starmetricsnew.temp3 t3
        LEFT JOIN
    starmetrics.occupation_9222015 o ON o.university = t3.university
        AND o.occupation_orig = t3.x_occupational_classification;"""), autocommit = True)
    
    # Drop temporary tables
    db.engine.execute(text("drop table if exists starmetricsnew.awardtemp;"), autocommit = True)
    db.engine.execute(text("""drop table if exists starmetricsnew.temp1;"""), autocommit = True)
    db.engine.execute(text("""drop table if exists starmetricsnew.temp2;"""), autocommit = True)
    db.engine.execute(text("""drop table if exists starmetricsnew.temp3;"""), autocommit = True)
    



In [None]:
# lets create the employee transaction table
dbalchemy, dbmysql = dbConnect()
createEmployeeTransaction(dbalchemy, "employee_transactions_test")

Now that we have created the employee_transactions table. We now need to create the input table that will serve as the source of data for all our models.


In [None]:
class Employee(object):

    def __init__(self, row):
        self.__employee_id = row['__employee_id']
        self.university = row['university']
        self.last_name = row['last_name']
        self.first_name = row['first_name']
        self.middle_name = row['middle_name']
        self.dob = row['dob']
        self.min_period_start_date = row['period_start_date']
        self.max_period_end_date = row['period_end_date']

        if row['occupationalclassification'].lower().startswith('graduate'):
            self.min_grad_period_start_date = row['period_start_date']
            self.max_grad_period_end_date = row['period_end_date']
            self.first_grad_orig_occup = row['x_occupational_classification']
            self.last_grad_orig_occup = row['x_occupational_classification']
        else:
            self.min_grad_period_start_date = datetime.date(3000, 1, 1)
            self.max_grad_period_end_date = datetime.date(1900, 1, 1)
            self.first_grad_orig_occup = None
            self.last_grad_orig_occup = None

        self.first_appear_bucketed_occup = row['occupationalclassification']
        self.first_appear_orig_occup = row['x_occupational_classification']
        self.last_appear_bucketed_occup = row['occupationalclassification']
        self.last_appear_orig_occup = row['x_occupational_classification']


    def addtransaction(self, row):
        if row['period_start_date'] < self.min_period_start_date:
            self.min_period_start_date = row['period_start_date'] 
            self.first_appear_bucketed_occup = row['occupationalclassification']
            self.first_appear_orig_occup = row['x_occupational_classification']

        if row['period_end_date'] > self.max_period_end_date:
            self.max_period_end_date = row['period_end_date']
            self.last_appear_bucketed_occup = row['occupationalclassification']
            self.last_appear_orig_occup = row['x_occupational_classification']

        if row['occupationalclassification'].lower().startswith('graduate'):
            if row['period_start_date'] < self.min_grad_period_start_date:
                self.min_grad_period_start_date = row['period_start_date'] 
                self.first_grad_orig_occup = row['x_occupational_classification']

            if row['period_end_date'] > self.max_grad_period_end_date:
                self.max_grad_period_end_date = row['period_end_date']
                self.last_grad_orig_occup = row['x_occupational_classification']


    def todict(self):
        if self.first_grad_orig_occup:
            min_grad_period_start_date = self.min_grad_period_start_date
            max_grad_period_end_date = self.max_grad_period_end_date
            max_grad_year = max_grad_period_end_date.year
        else:
            min_grad_period_start_date = None
            max_grad_period_end_date = None
            max_grad_year = None

        return {"__employee_id": self.__employee_id,
                "university": self.university,
                "last_name": self.last_name,
                "first_name": self.first_name,
                "middle_name": self.middle_name,
                "dob": self.dob,
                "max_grad_year": max_grad_year,
                "min_period_start_date": self.min_period_start_date,
                "max_period_end_date": self.max_period_end_date,
                "first_appear_bucketed_occup": self.first_appear_bucketed_occup,
                "first_appear_orig_occup": self.first_appear_orig_occup,
                "first_appear_date": self.min_period_start_date,
                "first_appear_as_grad_date": min_grad_period_start_date,
                "first_appear_as_grad_orig_occup": self.first_grad_orig_occup,
                "last_appear_as_grad_date": max_grad_period_end_date,
                "last_appear_as_grad_orig_occup": self.last_grad_orig_occup,
                "last_appear_bucketed_occup": self.last_appear_bucketed_occup,
                "last_appear_orig_occup": self.last_appear_orig_occup}
    
            
def sm_get_employees(db):
    employees = {}

    cur = db.cursor(MySQLdb.cursors.SSDictCursor)
    cur.execute("""SELECT et.*, en.lastname as last_name, en.dob, en.firstname as first_name, middlename as middle_name
                FROM starmetricsnew.employee_transaction_11242015 et
                inner join starmetrics.employee_name en
                on en.employeeid = et.x_employee_id AND
                en.university = et.university;""")

    row = cur.fetchone()
    while row:
        id = row["__employee_id"]
        if id in employees:
            employees[id].addtransaction(row)
        else:
            employees[id] = Employee(row)

        row = cur.fetchone()

    return employees

def sm_names_init(db):
    """Create a table to store starmetrics employees."""

    cur = db.cursor()
    cur.execute("drop table if exists air.sm_input_sed_temp")
    cur.execute("""create table air.sm_input_sed_temp (
                   __employee_id int unsigned not null,
                   university varchar(45) null,
                   last_name varchar(50) null,
                   first_name varchar(50) null,
                   middle_name varchar(50) null,
                   dob date, 
                   max_grad_year int null,
                   min_period_start_date date not null,
                   max_period_end_date date not null,
                   first_appear_bucketed_occup varchar(100) null,
                   first_appear_orig_occup varchar(100) null,
                   first_appear_date date null,
                   first_appear_as_grad_date date null,
                   first_appear_as_grad_orig_occup varchar(100) null,
                   last_appear_as_grad_date date,
                   last_appear_as_grad_orig_occup varchar(100),
                   last_appear_bucketed_occup varchar(100) null,
                   last_appear_orig_occup varchar(100) null,
                   primary key (__employee_id))""")

def sm_insert_employees(db, employees):
    insert_columns = ["__employee_id", "university", "last_name", "first_name", "middle_name", "dob", "max_grad_year",
                      "min_period_start_date", "max_period_end_date",
                      "first_appear_bucketed_occup", "first_appear_orig_occup",
                      "first_appear_date", "first_appear_as_grad_date", "first_appear_as_grad_orig_occup",
                      "last_appear_as_grad_date", "last_appear_as_grad_orig_occup",
                      "last_appear_bucketed_occup", "last_appear_orig_occup"]

    sql = "insert into air.sm_input_sed_temp ("
    sql += ", ".join(insert_columns)
    sql += ") values ("
    sql += ", ".join("%s" for _ in insert_columns)
    sql += ")"

    cur = db.cursor()

    for e in employees.values():
        d = e.todict()
        values = [d[k] for k in insert_columns]
        cur.execute(sql, values)

    db.commit()

def daysWorked(db, empTransTable):
    db.engine.execute(text("drop table if exists starmetricsnew.temp1;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.temp2;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.graddays;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.totaldays;"), autocommit = True)
    
    # The following code calculates the days worked by individuals as graduate students and 
    # stores it in a table called graddays
    
    db.engine.execute(text("""Create table starmetricsnew.temp1 AS SELECT 
                        __employee_id, period_start_date, period_end_date, occupationalclassification
                        FROM starmetricsnew.""" + empTransTable + """ WHERE
                        occupationalclassification = "Graduate Student"
                        GROUP BY __employee_id , period_start_date , 
                        period_end_date, occupationalclassification;"""), autocommit = True)
    db.engine.execute(text("""Create table starmetricsnew.temp2 AS
                        select t1.*, Datediff(period_end_date, period_start_date) + 1 AS days_worked
                        from starmetricsnew.temp1 t1;"""), autocommit = True)
    db.engine.execute(text("""Create table starmetricsnew.graddays AS SELECT 
                        __employee_id, SUM(days_worked) AS days_worked_as_grad FROM starmetricsnew.temp2
                        GROUP BY __employee_id;"""), autocommit = True)
    db.engine.execute(text("""ALTER TABLE `starmetricsnew`.`graddays` 
                        ADD INDEX `emp` (`__employee_id` ASC)  COMMENT '';"""), autocommit = True)
    
    # Drop the temporary tables created.
    db.engine.execute(text("drop table if exists starmetricsnew.temp1;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.temp2;"), autocommit = True)

    # The following code calculates the total days worked by individuals and 
    # stores it in a table called totaldays
       
    db.engine.execute(text("""Create table starmetricsnew.temp1 AS SELECT 
                        __employee_id, period_start_date, period_end_date FROM
                        starmetricsnew.""" + empTransTable + """ 
                        GROUP BY __employee_id , period_start_date , period_end_date;"""), autocommit = True)
    db.engine.execute(text("""Create table starmetricsnew.temp2 AS
                        select t1.*, Datediff(period_end_date, period_start_date) + 1 AS days_worked
                        from starmetricsnew.temp1 t1;"""), autocommit = True)
    
    db.engine.execute(text("""Create table starmetricsnew.totaldays AS SELECT 
                        __employee_id, SUM(days_worked) AS days_worked FROM
                        starmetricsnew.temp2 GROUP BY __employee_id;"""), autocommite = True)
    db.engine.execute(text("""ALTER TABLE `starmetricsnew`.`totaldays` 
                        ADD INDEX `emp` (`__employee_id` ASC)  COMMENT '';"""), autocommit = True)
    
    # Drop the temporary tables created.
    db.engine.execute(text("drop table if exists starmetricsnew.temp1;"), autocommit = True)
    db.engine.execute(text("drop table if exists starmetricsnew.temp2;"), autocommit = True)
    
    #Add the days_worked and days_worked_as_grad created above to the existing input table
    # and create the final sm_input_sed table.
    db.engine.execute(text("""Drop table if exists air.sm_input_sed;"""), autocommit = True)
    db.engine.execute(text("""Create Table air.sm_input_sed AS SELECT
                        si.*, g.days_worked_as_grad, t.days_worked FROM air.sm_input_sed_temp si
                        left join starmetricsnew.graddays g USING (__employee_id)
                        left join starmetricsnew.totaldays t USING (__employee_id);"""), autocommit = True)
    
    db.engine.execute(text("""Drop table if exists air.sm_input_sed_temp;"""), autocommit = True)
    
def sm_init(dbmysql, dbalchemy):
    sm_names_init(dbmysql)

    employees = sm_get_employees(dbmysql)
    sm_insert_employees(dbmysql, employees)
    daysWorked(dbalchemy, "employee_transaction_11242015")
    


In [None]:
dbalchemy, dbmysql = dbConnect()
sm_init(dbmysql, dbalchemy)