In [1]:
import pandas as pd
from pandas import DataFrame as DF, Series
import numpy as np
import psycopg2 as pg

import requests
import io
import json

In [2]:
def insert_df(df, name):
    """ Inserts rows from a dataframe into a postgres table.
    
        df : pandas dataframe
        name : name of the table in postgres
    """
    global cur
    
    cols = to_df('select * from ' + name).columns.tolist()
    rows = [tuple(r)[1:] for r in df[cols].itertuples()]
    n = len(rows[0])
    row_values = ','.join([cur.mogrify('({})'.format(
        ', '.join(n*['%s'])), r).decode('utf-8') for r in rows])
    q = 'INSERT INTO {} VALUES '.format(name.upper()) + row_values
    cur.execute(q)
    print('Inserted {} rows into {}'.format(len(df), name))

    
def to_df(q):
    """ Return dataframe containing query results.
    
        q : query string that will be executed by postgres
    """
    global cur
    cur.execute(q)
    return DF(cur.fetchall(), 
              columns=[d[0] for d in cur.description])

In [3]:
def nan_to_null(f,
        _NULL=pg.extensions.AsIs('NULL'),
        _NaN=np.NaN,
        _Float=pg.extensions.Float):
    if f is not _NaN:
        return _Float(f)
    return _NULL

pg.extensions.register_adapter(float, nan_to_null)

In [4]:
db = pg.connect(host='0.0.0.0', port='5432', user='postgres')
cur = db.cursor()

## Get Data

In [5]:
urls = {
    'committee_history': """https://raw.githubusercontent.com/hackoregon/elections-2018/
    master/scrape_files/committee_history_first_batch.csv""",
    'committees_list': """https://raw.githubusercontent.com/hackoregon/elections-2018/
    master/scrape_files/committees_list.tsv""",
    'statement_of_org': """https://raw.githubusercontent.com/hackoregon/elections-2018/
    master/scrape_files/statement_of_organization_first_batch.csv""",
    'transactions': """https://raw.githubusercontent.com/hackoregon/elections-2018/
    master/scrape_files/transactions_first_batch.csv""",
    'transaction_details': """https://raw.githubusercontent.com/hackoregon/elections-2018/
    master/scrape_files/trans_detail_first_batch_clean.csv""",
    'election_activity': """https://raw.githubusercontent.com/hackoregon/elections-2018/
    master/scrape_files/election_activity_first_batch.csv""",
}

In [6]:
import sys
all_dfs = {}
for name,url in urls.items():
    sys.stdout.write('\r{}: loading'.format(name))
    sys.stdout.flush()
    r = requests.get(url.replace('\n', '').replace(' ', ''))
    if name == 'transaction_details':
        all_dfs[name] = pd.read_csv(io.StringIO(r.content.decode('utf-8')), dtype={2:str})
    elif name == 'committees_list':
        all_dfs[name] = pd.read_csv(io.StringIO(r.content.decode('utf-8')), sep='\t')
    else:
        all_dfs[name] = pd.read_csv(io.StringIO(r.content.decode('utf-8')))
    sys.stdout.write('\r{}: complete'.format(name))
    sys.stdout.flush()
    print()

transaction_details: complete
transactions: complete
committees_list: complete
statement_of_org: complete
committee_history: complete
election_activity: complete


### Create Dummy Data for Other Tables

In [7]:
import json

In [8]:
# donor
data = {'donor_id': 123,
        'donor_name': 'Cookie Monster',
        'donor_address': '456 Sesame Street',
        'donor_type': "Trashcan Donor",
        'donor_cats': json.dumps({'categories': ['trash', 'can', 'trashcan']})}
all_dfs['donor'] = DF([data])

# payee
data = {'payee_id': 789,
        'payee_name': 'James Bond'}
all_dfs['payee'] = DF([data])

# ballots
d1 = {'name': 'Jane Smith', 
      'campaigns': [{'year': 2016, 
                     'pos': 'myr', 
                     'votes': {'county_a': 300,
                               'county_b': 480}}, 
                    {'year': 2017, 
                     'pos': 'gov', 
                     'votes': {'county_a': 750,
                               'county_b': 800,
                               'county_c': 650}}
                   ]
     }
d2 = {'name': 'John Doe', 
      'campaigns': [{'year': 2016, 
                     'pos': 'myr', 
                     'votes': {'county_a': 150,
                               'county_b': 370}}, 
                    {'year': 2017, 
                     'pos': 'sen', 
                     'votes': {'county_a': 460,
                               'county_b': 500,
                               'county_d': 350}}
                   ]
     }
data = [['Jane Smith', json.dumps(d1)],
        ['John Doe', json.dumps(d2)]]
all_dfs['ballots'] = DF(data, columns=['candidate_name', 'json'])

## Create Tables

In [9]:
# run this cell to create local-elections-finance database
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
db.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = db.cursor()
try:
    cur.execute('create database "local-elections-finance"')
    db.commit()
except:
    db.rollback()
    pass

In [10]:
# connect to local-elections-finance
db = pg.connect(host='0.0.0.0', dbname='local-elections-finance', port='5432', user='postgres')
cur = db.cursor()

In [11]:
all_qs = []  # holds create table statements

# creates a table from slightly altered markdown (the string)
def create_table(string, name):
    global cur, db, all_qs
    lines = [l.strip() for l in string.replace('*', '').strip().split('\n')]
    start = "CREATE TABLE IF NOT EXISTS {} (".format(name)
    end = ")"
    q = start + ', '.join(lines) + end
    all_qs.append(q)
    cur.execute(q)
    print('Created table', name.upper())

In [12]:
# dict to hold all schema strings from markdown
schemas = {
    'committee_history': """* **committee_id** int
* **committee_name** varchar(255)
* **committee_description** varchar(512)
* **effective** date
* **expiration** date
* **filing_type** varchar(32)""",
    'committees_list': """* **id** int primary key
* **filer_name** varchar(255)
* **filer_description** varchar(255)""",
    'election_activity': """* **election** varchar(32)
* **committee_id** int
* **active_date** date
* **status** varchar(8)
* **active_reason** varchar(255)""",
    'statement_of_org': """* **committee_id** int primary key
* **committee_name** varchar(255)
* **candidate_address** varchar(255)
* **committee_acronym** varchar(32)
* **committee_address** varchar(255)
* **committee_campaign_phone** varchar(32)
* **committee_filing_effective_from** varchar(255)
* **committee_filing_type** varchar(10)
* **committee_pac_type** varchar(32)
* **election_office** varchar(255)
* **email_address** varchar(255)
* **employer** varchar(255)
* **fax** varchar(32)
* **home_phone** varchar(32)
* **mailing_address** varchar(255)
* **name** varchar(255)
* **occupation** varchar(255)
* **party_affiliation** varchar(11)
* **treasurer_email_address** varchar(255)
* **treasurer_fax** varchar(32)
* **treasurer_home_phone** varchar(32)
* **treasurer_mailing_address** varchar(255)
* **treasurer_name** varchar(255)
* **treasurer_work_phone** varchar(32)
* **work_phone** varchar(32)""",
    'transactions': """* **transaction_id** int primary key
* **committee_id** int
* **transaction_date** date
* **status** varchar(32)
* **filer_committee** varchar(255)
* **contributor_payee** varchar(255)
* **transaction_subtype** varchar(255)
* **amount** numeric""",
    'transaction_details': """* **transaction_id** int primary key
* **payee_id** int
* **donor_id** int
* **address** varchar(255)
* **address_book_type** varchar(32)
* **agent** varchar(64)
* **aggregate** numeric
* **amount** numeric
* **associations** varchar(1024)
* **description** varchar(255)
* **due_date** timestamp
* **employer_name** varchar(255)
* **filed_date** timestamp
* **name** varchar(255)
* **occupation** varchar(255)
* **occupation_letter_date** date
* **payer_of_personal_expenditure** varchar(255)
* **payment_method** varchar(32)
* **process_status** varchar(32)
* **purpose** varchar(255)
* **repayment_schedule** varchar(64)
* **transaction_date** date
* **transaction_sub_type** varchar(255)
* **transaction_type** varchar(32)""",
    'donor': """* **donor_id** int primary key
* **donor_name** varchar(255)
* **donor_address** varchar(255)
* **donor_type** varchar(32)
* **donor_cats** jsonb""",
    'payee': """* **payee_id** int primary key
* **payee_name** varchar(255)""",
    'ballots': """* **candidate_name** varchar(255) primary key
* **json** jsonb"""}

In [13]:
all_dfs['committee_history'].select_dtypes(include=['O']).apply(lambda x: x.str.len()).max()

committee_name            79.0
committee_description    318.0
effective                 10.0
expiration                10.0
filing_type                9.0
dtype: float64

In [14]:
all_dfs['transaction_details'].select_dtypes(include=['O']).apply(lambda x: x.str.len()).max()

address                           90.0
address_book_type                 28.0
agent                             28.0
associations                     954.0
description                      179.0
due_date                          22.0
employer_name                     81.0
filed_date                        22.0
name                             110.0
occupation                        84.0
occupation_letter_date            10.0
payer_of_personal_expenditure     48.0
payment_method                    25.0
process_status                    22.0
purpose                          191.0
repayment_schedule                38.0
transaction_date                  10.0
transaction_sub_type              39.0
transaction_type                  24.0
dtype: float64

### Loop & Create Tables

In [15]:
for n,s in schemas.items():
    try:
        cur.execute('drop table if exists {}'.format(n))
        create_table(s, n)
        db.commit()
    except Exception as e:
        print(e)
        db.rollback()

Created table TRANSACTION_DETAILS
Created table DONOR
Created table TRANSACTIONS
Created table COMMITTEES_LIST
Created table STATEMENT_OF_ORG
Created table COMMITTEE_HISTORY
Created table BALLOTS
Created table ELECTION_ACTIVITY
Created table PAYEE


## Clean Data

In [16]:
def tofloat(x):
    try:
        if x.startswith('$'):
            x = x.strip('$').replace(',', '')
        return float(x)
    except:
        return None

**Transactions**

In [17]:
db_cols = to_df('select * from transactions').columns

In [18]:
# column names to lower_case
all_dfs['transactions'].columns = [c.lower().replace(' ', '_').replace('/', '_') for c in all_dfs['transactions']]
# rename a few
all_dfs['transactions'].rename(columns={'tran_id': 'transaction_id', 
                                        'tran_date': 'transaction_date',
                                        'sub_type': 'transaction_subtype'},
                               inplace=True)
# reorder columns to match DB
all_dfs['transactions'] = all_dfs['transactions'][db_cols]

In [19]:
# convert amount values to float
all_dfs['transactions'].loc[:, 'amount'] = all_dfs['transactions'].amount.apply(lambda x: tofloat(x))

**Statement of Org**

In [20]:
db_cols = to_df('select * from statement_of_org').columns

In [21]:
# column names to lower_case
all_dfs['statement_of_org'].columns = [c.lower().replace(' ', '_').replace('/', '_') 
                                       for c in all_dfs['statement_of_org']]

# reorder columns to match DB
all_dfs['statement_of_org'] = all_dfs['statement_of_org'][db_cols]

**Committees List**

In [22]:
db_cols = to_df('select * from committees_list').columns

In [23]:
db_cols.tolist()

['id', 'filer_name', 'filer_description']

In [24]:
# column names to lower_case
all_dfs['committees_list'].columns = [c.lower().replace(' ', '_') for c in all_dfs['committees_list']]

# reorder columns to match DB
all_dfs['committees_list'] = all_dfs['committees_list'][db_cols]

## Insert Data

In [25]:
for name, df in all_dfs.items():
    try:
        insert_df(df, name)
    except Exception as e:
        db.rollback()
        print(name)
        print(e)
        print()

Inserted 125 rows into statement_of_org
Inserted 1 rows into donor
Inserted 88310 rows into transactions
Inserted 1374 rows into committee_history
Inserted 88310 rows into transaction_details
Inserted 1617 rows into committees_list
Inserted 2 rows into ballots
Inserted 2056 rows into election_activity
Inserted 1 rows into payee
