In [1]:
%%writefile ../athena_example/config.py
# Connection parameters
ACCESS_KEY_ID = None
SECRET_ACCESS_KEY = None
ATHENA_GARBAGE_PATH = 's3://com.ria.scratch/athena_garbage/'
WORKGROUP = 'RIA'
REGION = 'eu-west-1'
SCHEMA_NAME = 'ria_data_science_s3'

# Database Parameters
DATABASE_BUCKET = 'com.ria.scratch'
DATABASE_ROOT_KEY = 'as-dedupe/'

Overwriting ../athena_example/config.py


In [7]:
%%writefile ../athena_example/athena_example.py
#!/usr/bin/python
"""
This is a setup script for athena_example.  It downloads a zip file of
Illinois campaign contributions and loads them into a Athena database
named 'contributions'.
 
__Note:__ You will need to run this script first before execuing
[athena_example.py](athena_example.py).
 
Tables created:
* raw_table - raw import of entire CSV file
* donors - all distinct donors based on name and address
* recipients - all distinct campaign contribution recipients
* contributions - contribution amounts tied to donor and recipients tables
"""

import os
import zipfile
import warnings
import pandas as pd
import numpy as np
from urllib.request import urlopen
import boto3
from pyathena import connect
import config
import csv


contributions_zip_file = 'Illinois-campaign-contributions.txt.zip'
contributions_txt_file = 'Illinois-campaign-contributions.txt'

if not os.path.exists(contributions_zip_file) :
    print('downloading', contributions_zip_file, '(~60mb) ...')
    u = urlopen('https://s3.amazonaws.com/dedupe-data/Illinois-campaign-contributions.txt.zip')
    localFile = open(contributions_zip_file, 'wb')
    localFile.write(u.read())
    localFile.close()

if not os.path.exists(contributions_txt_file) :
    zip_file = zipfile.ZipFile(contributions_zip_file, 'r')
    print('extracting %s' % contributions_zip_file)
    zip_file_contents = zip_file.namelist()
    for f in zip_file_contents:
        if ('.txt' in f):
            zip_file.extract(f)
    zip_file.close()



def as_pandas(query, **kwrgs):
    return utils.athena_to_panda(query, escapechar='\\', dtype='object', keep_default_na=False, na_values=[''], **kwrgs)

conn = connect(aws_access_key_id=config.ACCESS_KEY_ID,
               aws_secret_access_key=config.SECRET_ACCESS_KEY,
               s3_staging_dir=config.ATHENA_GARBAGE_PATH,
               region_name=config.REGION, 
               work_group=config.WORKGROUP)
c = conn.cursor(schema_name=config.SCHEMA_NAME)

print('importing raw data from csv...')
utils.athena_start_query("DROP TABLE IF EXISTS raw_table")
utils.athena_start_query("DROP TABLE IF EXISTS donors")
utils.athena_start_query("DROP TABLE IF EXISTS recipients")
utils.athena_start_query("DROP TABLE IF EXISTS contributions")
utils.athena_start_query("DROP TABLE IF EXISTS processed_donors")


q=r'''
CREATE EXTERNAL TABLE raw_table 
    (reciept_id INT, last_name VARCHAR(70), first_name VARCHAR(35), 
    address_1 VARCHAR(35), address_2 VARCHAR(36), city VARCHAR(20), 
    state VARCHAR(15), zip VARCHAR(11), report_type VARCHAR(24), 
    date_recieved VARCHAR(10), loan_amount VARCHAR(12), 
    amount VARCHAR(23), receipt_type VARCHAR(23), 
    employer VARCHAR(70), occupation VARCHAR(40), 
    vendor_last_name VARCHAR(70), vendor_first_name VARCHAR(20), 
    vendor_address_1 VARCHAR(35), vendor_address_2 VARCHAR(31), 
    vendor_city VARCHAR(20), vendor_state VARCHAR(10), 
    vendor_zip VARCHAR(10), description VARCHAR(90), 
    election_type VARCHAR(10), election_year VARCHAR(10), 
    report_period_begin VARCHAR(10), report_period_end VARCHAR(33), 
    committee_name VARCHAR(70), committee_id VARCHAR(37)) 
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'  
  ESCAPED BY '\\'
LOCATION
    's3://{}/{}' 
TBLPROPERTIES (
    'classification'='csv', 
    'skip.header.line.count'='1',  
    'serialization.null.format'='')
'''.format(config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY+'raw_table') 
utils.athena_start_query(q)


df = pd.read_csv(contributions_txt_file, sep='\t', escapechar='\\', quoting=csv.QUOTE_NONE,  
                 error_bad_lines=False, warn_bad_lines=True, dtype=str, keep_default_na=False, na_values=[''])#,

# Remove the very few records that mess up the demo 
# (demo purposes only! Don't do something like this in production)
df = df[df['RcvDate'].str.len()>=10]

# set empty, non-zero, strings in date columns to null
df.loc[df['RptPdBegDate'].str.len()<10,'RptPdBegDate'] = np.nan

df.loc[df['RptPdEndDate'].str.len()<10,'RptPdEndDate'] = np.nan

#committee ID is requred. Remove the 2 rows that don't have it.
df = df[df['ID']!='']

# There's a record with a date stuck in the committee_id column, which causes
# problems when inserting into the contributions table below. Get rid of it this 
# way.
df = df[df['ID'].str.len() <=9]

# dropping the last columns
df = df.drop(columns='Unnamed: 29')

# Nullifying empty strings
# df = df.replace(r'^\s*$', np.nan, regex=True)
df_lower=df.apply(lambda x: x.str.lower() if x.dtype=='object' else x, result_type='expand')
utils.write(body=df_lower.to_csv(quoting=csv.QUOTE_NONE, sep="\t", escapechar='\\', index=None),
           filename=os.path.join("s3://", config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY,'raw_table', contributions_txt_file,))

print('creating donors table...')
q='''
CREATE TABLE donors as
    with tmp as
      (SELECT DISTINCT 
           TRIM(last_name) as last_name, TRIM(first_name) as first_name, 
           TRIM(address_1) as address_1, TRIM(address_2) as address_2, 
           TRIM(city) city, TRIM(state) as state, 
           TRIM(zip) as zip, TRIM(employer) as employer, 
           TRIM(occupation) as occupation
      FROM raw_table)
    SELECT row_number() over () as donor_id, * from tmp'''
utils.athena_start_query(q)


q='''
CREATE TABLE recipients as
    SELECT DISTINCT committee_id, committee_name FROM raw_table
'''
utils.athena_start_query(q)

print('creating contributions table')
q='''
CREATE TABLE contributions as
    SELECT reciept_id, donors.donor_id, committee_id, 
        report_type, date_parse(date_recieved, '%m/%d/%Y') as date_recieved, 
        loan_amount, amount, 
        receipt_type, vendor_last_name , 
        vendor_first_name, vendor_address_1, vendor_address_2, 
        vendor_city, vendor_state, vendor_zip, description, 
        election_type, election_year, 
        date_parse(report_period_begin, '%m/%d/%Y') as report_period_begin, 
        date_parse(report_period_end, '%m/%d/%Y') as report_period_end 
    FROM raw_table JOIN donors ON 
        donors.first_name = TRIM(raw_table.first_name) AND 
        donors.last_name = TRIM(raw_table.last_name) AND 
        donors.address_1 = TRIM(raw_table.address_1) AND 
        donors.address_2 = TRIM(raw_table.address_2) AND 
        donors.city = TRIM(raw_table.city) AND 
        donors.state = TRIM(raw_table.state) AND 
        donors.employer = TRIM(raw_table.employer) AND 
        donors.occupation = TRIM(raw_table.occupation) AND 
        donors.zip = TRIM(raw_table.zip)'''
utils.athena_start_query(q)

q = '''
CREATE TABLE processed_donors AS  
    SELECT donor_id,  
     LOWER(city) AS city,  
     CASE WHEN (first_name IS NULL AND last_name IS NULL) 
          THEN NULL 
          ELSE LOWER(array_join(filter(array[first_name, last_name], x-> x IS NOT NULL), ' ')) 
     END AS name,  
     LOWER(zip) AS zip,  
     LOWER(state) AS state,  
     CASE WHEN (address_1 IS NULL AND address_2 IS NULL) 
          THEN NULL 
          ELSE LOWER(array_join(filter(array[address_1, address_1], x-> x IS NOT NULL), ' '))
     END AS address,  
     LOWER(occupation) AS occupation, 
     LOWER(employer) AS employer, 
     first_name is null AS person 
 FROM donors'''
utils.athena_start_query(q)




print('done')

Overwriting ../athena_example/athena_example.py


In [8]:
!python ../athena_example/athena_example.py

importing raw data from csv...
b'Skipping line 1441352: expected 30 fields, saw 31\n'
b'Skipping line 1465996: expected 30 fields, saw 31\n'
b'Skipping line 1495732: expected 30 fields, saw 31\n'
b'Skipping line 1631504: expected 30 fields, saw 31\nSkipping line 1631506: expected 30 fields, saw 31\n'
b'Skipping line 1660260: expected 30 fields, saw 31\nSkipping line 1660264: expected 30 fields, saw 32\n'
b'Skipping line 1441352: expected 30 fields, saw 31\n'
b'Skipping line 1465996: expected 30 fields, saw 31\n'
b'Skipping line 1495732: expected 30 fields, saw 31\n'
b'Skipping line 1631504: expected 30 fields, saw 31\nSkipping line 1631506: expected 30 fields, saw 31\n'
b'Skipping line 1660260: expected 30 fields, saw 31\nSkipping line 1660264: expected 30 fields, saw 32\n'
creating donors table...
creating contributions table
done
