In [4]:
#INITIALIZE EVERYTHING, FUNCTIONS, ETC.

import pandas as pd
import requests
import config
import datadotworld as dw
import pygsheets
import http.client
import json
import time
from pandas.io.json import json_normalize

##Schedule_a API guide: https://api.open.fec.gov/developers/#/receipts/get_schedules_schedule_a_˜

##Define functions
def get_cands(**kwargs):
    state = kwargs.get('state')
    cycle = kwargs.get('cycle')
    url = 'https://api.open.fec.gov/v1/candidates/search'

    params = {'election_year': cycle
              , 'state': state
              , 'api_key': config.fec_key
              , 'is_active_candidate': True
              , 'has_raised_funds': True
              }

    params = {k: v for k, v in params.items() if v}

    def cand_req():

        cand_all = []
        r = requests.get(url, params=params).json()
        page = r['pagination']['page']

        while page <= r['pagination']['pages']:

            cands = json_normalize(data=r['results'])[['candidate_id'
                , 'name'
                , 'party_full'
                , 'incumbent_challenge_full'
                , 'office_full'
                , 'first_file_date'
                                                       ]]
            comm = json_normalize(data=r['results'], record_path='principal_committees')
            comm = comm[['candidate_ids'
                , 'committee_id'
                , 'name']]

            comm['candidate_ids'] = comm['candidate_ids'].str[0]

            # Merge candidate and committee lookups
            cands = cands.merge(comm, left_on='candidate_id', right_on='candidate_ids')

            # Rename cols
            colnm = {
                'name_x': 'candidate_name'
                , 'name_y': 'committee_name'
            }

            cands.rename(columns=colnm, inplace=True)
            cands.drop(columns='candidate_ids', inplace=True)

            cand_all.append(cands)

            page+=1

            params.update(page=page)

            r = requests.get(url, params=params).json()

        return cand_all

    if cycle:

        for state in state:

            params['state'] = state

            for cycle in cycle:

                cand_all = cand_req()

    else:

        for state in state:

            params['state'] = state

            cand_all = cand_req()

    cand_all = pd.concat(cand_all, sort=False, ignore_index=True).drop_duplicates().reset_index(drop=True)

    return cand_all

def get_committees(names):
    names = names.split(',')

    comms = []
    end = 'https://api.open.fec.gov/v1/names/committees'

    for name in names:
        params = {'q': name
            , 'api_key': config.fec_key}

        r = requests.get(end, params=params).json()

        comm = json_normalize(data=r['results'])

        comms.append(comm)

    comm_all = pd.concat(comms, sort=False, ignore_index=True).drop_duplicates()

    return comm_all


def get_itemized(cycle, cands):

    def get_unitem(cycle, commid):

        end = 'https://api.open.fec.gov/v1/committee/'
        params = {
            'api_key': config.fec_key
            , 'cycle': cycle
            , 'per_page': '100'
            , 'committee_id': commid
        }

        params = {k: v for k, v in params.items() if v}

        # Collect unitemized contributions
        r = requests.get(end + commid + '/totals', params=params).json()
        udf = json_normalize(r['results'])
        return udf

    end = 'https://api.open.fec.gov/v1/schedules/schedule_a/'
    ids = cands['committee_id']
    dfs = []
    udfs = []
    page_count = 0
    cand_count = len(cands)

    for idx, commid in enumerate(ids):

        for_start = time.time()
        r_count = 0

        item_page = 0

        params = {
            'per_page': '100'
            , 'sort': 'contribution_receipt_date'
            , 'api_key': config.fec_key
            , 'is_individual': 'true'
            , 'two_year_transaction_period': cycle
            , 'last_index': []
            , 'last_contribution_receipt_date': []
            , 'committee_id': commid
        }
        params = {k: v for k, v in params.items() if v}

        try:
            udfs.append(get_unitem(cycle, commid))
        except:
            continue

        # Initialize Schedule A request
        r = requests.get(end, params=params).json()

        try:
            pages = r['pagination']['pages']
        except:
            pages=0

        r_count += 1

        candidate = cands['candidate_name'][idx]
        print(f'Loading contributions for {candidate}')

        try:
            while r['pagination']['last_indexes'] is not None:
                df = json_normalize(r['results'])
                dfs.append(df)

                last_index = r['pagination']['last_indexes']['last_index']
                last_date = r['pagination']['last_indexes']['last_contribution_receipt_date']

                params.update([('last_index', last_index)
                              , ('last_contribution_receipt_date', last_date)])

                r = requests.get(end, params=params).json()
                r_count += 1
                page_count += 1
                item_page += 1

                for_duration = time.time() - for_start
                r_rate = r_count / for_duration
                sleep = abs(r_rate - 1)

                if r_rate >= 1:
                    print(f'Hit rate {r_rate} on {candidate} page {page_count}')
                    time.sleep(sleep)

        except:
            print(f'Broke on page {item_page} for {candidate}.')
            print(f'Last index: {last_index} //n Last date: {last_date} //n commid: {commid}')
            print(f'Reached page {item_page} of {pages} for {candidate}.')
            time.sleep(sleep)

    print(f'{page_count} pages for {cand_count} candidates')

    # After for loop, concatenate dfs
    df = pd.concat(dfs, sort=False, ignore_index=True).drop_duplicates(subset='transaction_id')
    ustore = pd.concat(udfs, sort=False, ignore_index=True).drop_duplicates()

    # Clean dataframe
    df['contributor_zip'] = df['contributor_zip'].str[:5]

    # Filter to is_individual and no memoed subtotal
    df = df[(df['is_individual'] == True) | (df['memoed_subtotal'] == False)]

    # Transform unitemized table, based on df structure
    cols = df.columns.values.tolist()
    udf = pd.DataFrame(columns=cols)

    targetcols = ['committee.name'
        , 'committee.party_full'
        , 'committee_id'
        , 'contribution_receipt_amount'
        , 'contribution_receipt_date'
        , 'fec_election_type_desc']

    sourcecols = ['committee_name'
        , 'party_full'
        , 'committee_id'
        , 'individual_unitemized_contributions'
        , 'coverage_end_date'
        , 'last_report_type_full']

    udf[targetcols] = ustore[sourcecols]

    # Add labels
    udf['contributor_name'] = 'Unitemized individual contributions'
    udf['entity_type'] = 'IND'

    # Combine dataframes
    df = pd.concat([df, udf], sort=False, ignore_index=True)

    # Parse datetime
    df['contribution_receipt_date'] = df['contribution_receipt_date'].str.split('T', expand=True)[0]

    return df


def get_ies(cycle, cands):

    end = 'https://api.open.fec.gov/v1/schedules/schedule_e/'
    ids = cands['candidate_id']
    dfs = []
    page_count = 0

    for idx, item in enumerate(ids):

        for_start = time.time()
        r_count = 0

        params = {
            'per_page': '100'
            , 'api_key': config.fec_key
            , 'cycle': cycle
            , 'last_index': []
            , 'last_expenditure_date': []
            , 'candidate_id': item
        }

        params = {k: v for k, v in params.items() if v}

        r = requests.get(end, params=params).json()

        candidate = cands['candidate_name'][idx]

        print(f'Loading IEs for {candidate}')

        try:
            pages = str(r['pagination']['pages'])
        except:
            pages = 0

        if pages == 0:
            try:
                df = json_normalize(r['results'])
                dfs.append(df)
            except:
                df = []

        else:
            while r['pagination']['last_indexes'] is not None:
                df = json_normalize(r['results'])
                dfs.append(df)

                last_index = r['pagination']['last_indexes']['last_index']
                last_date = r['pagination']['last_indexes']['last_expenditure_date']

                params.update([('last_index', last_index)
                                  , ('last_expenditure_date', last_date)])

                r = requests.get(end, params=params).json()
                r_count += 1
                page_count += 1

                for_duration = time.time() - for_start
                r_rate = r_count / for_duration
                sleep = abs(r_rate - 1)

                if r_rate >= 1:
                    print(f'Hit rate {r_rate} on {candidate} page {page_count}')
                    time.sleep(1)

    # After for loop, concatenate dfs
    df = pd.concat(dfs, sort=False, ignore_index=True).drop_duplicates(subset='transaction_id')

    # Clean dataframe
    df['commiteee.zip'] = df['committee.zip'].str[:5]
    df['expenditure_date'] = df['expenditure_date'].str.split('T', expand=True)[0]

    return df

def get_coordinated(cycle, cands):

    end = 'https://api.open.fec.gov/v1/schedules/schedule_f/'
    ids = cands['candidate_id']
    dfs = []
    page_count = 0

    for i, item in enumerate(ids):

        params = {
            'per_page': '100'
            , 'api_key': config.fec_key
            , 'two_year_transaction_period': cycle
            , 'page': 1
            , 'candidate_id': item
        }
        params = {k: v for k, v in params.items() if v}

        r = requests.get(end, params=params).json()

        cur_page = r['pagination']['page']
        all_pgs = r['pagination']['pages']

        candidate = cands['candidate_name'][i]

        for page in range(all_pgs):
            for_start = time.time()
            r_count = 0

            params.update([('page', page + 1)])
            r = requests.get(end, params=params).json()

            r_count += 1
            page_count += 1

            for_duration = time.time() - for_start
            r_rate = r_count / for_duration
            sleep = abs(r_rate - 1)

            if r_rate >= 1:
                print(f'Hit rate {r_rate} on {candidate} page {page_count}')
                time.sleep(sleep)

            df = json_normalize(r['results'])
            dfs.append(df)

    # After for loop, concatenate dfs
    df = pd.concat(dfs, sort=False, ignore_index=True).drop_duplicates(subset='transaction_id')

    # Clean dataframe
    df['commiteee.zip'] = df['committee.zip'].str[:5]

    return df


def get_summary(cycle, cands):

    end = 'https://api.open.fec.gov/v1/committee/'
    ids = cands['committee_id']
    dfs = []

    for idx, id in enumerate(ids):
        params = {
            'api_key': config.fec_key
            , 'cycle': cycle
            , 'per_page': '100'
        }
        params = {k: v for k, v in params.items() if v}

        r = requests.get(end + id + '/totals', params=params).json()
        try:
            df = json_normalize(r['results'])
        except:
            continue

        dfs.append(df)

    # After for loop, concatenate dfs
    df = pd.concat(dfs, sort=False, ignore_index=True).drop_duplicates()

    return df

def write_cands(df):
    results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM candidate_committee_lookup').dataframe

    merged_df = pd.concat([results, df]).drop_duplicates(subset=['committee_id']).reset_index(drop=True)

    oldlen = len(results)
    newlen = len(merged_df)

    with dw.open_remote_file('darrenfishell/2020-election-repo', 'candidate_committee_lookup.csv') as w:
        merged_df.to_csv(w, index=False)

    return oldlen, newlen

def write_indiv(df):
    results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM individual_congressional_contributions').dataframe

    subset = ['contribution_receipt_date', 'committee_id', 'transaction_id']

    merged_df = pd.concat([results, df], sort=False, ignore_index=True).drop_duplicates(subset=subset)

    oldlen = len(results)
    newlen = len(merged_df)

    test = oldlen < newlen

    if test:
        with dw.open_remote_file('darrenfishell/2020-election-repo', 'individual-congressional-contributions.csv') as w:
            merged_df.to_csv(w, index=False)
    return test, oldlen, newlen


def write_summary(df):
    results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM congress_financial_summaries').dataframe

    merged_df = pd.concat([df, results], sort=False, ignore_index=True).drop_duplicates()

    oldcash = sum(results['receipts'])
    oldlen = len(results)
    newcash = sum(merged_df['receipts'])
    newlen = len(merged_df)

    test = oldcash < newcash

    if test:
        with dw.open_remote_file('darrenfishell/2020-election-repo', 'congress_financial_summaries.csv') as w:
            merged_df.to_csv(w, index=False)
    return test, oldlen, newlen


def write_ies(df):
    results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM congress_independent_expenditures').dataframe

    subset = ['transaction_id', 'candidate_id', 'filing_date']

    merged_df = pd.concat([df, results], sort=False, ignore_index=True).drop_duplicates(subset=subset)

    oldlen = len(results)
    newlen = len(merged_df)

    test = oldlen < newlen

    if test:
        with dw.open_remote_file('darrenfishell/2020-election-repo', 'congress-independent-expenditures.csv') as w:
            merged_df.to_csv(w, index=False)
    return test, oldlen, newlen


def write_coord(df):
    results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM congress_party_coordinated_expenditures').dataframe

    subset = ['transaction_id', 'candidate_id', 'expenditure_date']

    merged_df = pd.concat([df, results], sort=False, ignore_index=True).drop_duplicates(subset=subset)

    oldlen = len(results)
    newlen = len(merged_df)

    test = oldlen < newlen

    if test:
        with dw.open_remote_file('darrenfishell/2020-election-repo',
                                 'congress-party-coordinated-expenditures.csv') as w:
            merged_df.to_csv(w, index=False)
    return test, oldlen, newlen


def write_to_gsheet():
    gc = pygsheets.authorize(service_file='gcreds.json')
    conn = http.client.HTTPSConnection("api.data.world")
    headers = {'authorization': "Bearer " + config.dw_key}

    sheets_to_dw = [['maine-congress-2020', 'e2b1bde2-1e60-4d49-bd31-da5aa7ce0611', 1],
                    ['maine-congress-2020', '026e8f40-d10e-4324-8b45-80dbc0e61627', 0]]

    for idx, sheet in enumerate(sheets_to_dw):
        sheet = [x[0] for x in sheets_to_dw][idx]
        queryid = [x[1] for x in sheets_to_dw][idx]
        gsh_idx = [x[2] for x in sheets_to_dw][idx]

        # Retrieve query
        conn.request("GET", "/v0/queries/" + queryid, headers=headers)
        data = conn.getresponse().read()
        # Execute Query
        results = dw.query('darrenfishell/2020-election-repo', json.loads(data)['body']).dataframe

        # Prepare to load into Google Sheets
        sh = gc.open(sheet)
        wks = sh.worksheet('index', gsh_idx)
        wks.clear()
        wks.rows = results.shape[0]
        wks.set_dataframe(results, start='A1', nan='')

In [3]:
##MAIN.PY
import time

def lambda_handler(event, context):
    start = time.time()
    # Step 1: Set state(s) and cycle(s) for candidate search
    state = ['ME']
    cycle = ['2020']

    # Query candidates and write to data.world
    try:
        cands = get_cands(state=state, cycle=cycle)
        oldlen, newlen = write_cands(cands)

        print(f'Wrote {newlen} candidate records. Prior load had {oldlen} records.')

    except:
        print('Candidate lookup query failed.')

    # Get - write contribution pairs
    getwrite = {
        get_itemized: write_indiv,
        get_summary: write_summary,
        get_ies: write_ies,
        get_coordinated: write_coord
    }

    # Filename - input pairs
    files_input = {
        'itemized contributions': [cycle, cands],
        'campaign summary': [cycle, cands],
        'independent expenditures': [cycle, cands],
        'party coordinated expenditures': [cycle, cands]
    }

    # List of functions, filenames and inputs to unpack
    files = [x[0] for x in list(files_input.items())]
    params = [x[1] for x in list(files_input.items())]

    # Iterate over all get-write functions, with TRY
    for idx, (get, write) in enumerate(getwrite.items()):

        # Set filename
        file = files[idx]

        print(f'getwrite index: {idx}')
        # Run function r, return dataframes
        df = get(*params[idx])

        # Execute write functions to write to datadotworld
        newtest, oldlen, newlen = write(df)
        newrecords = newlen

        if newtest:
            print(f'Wrote {newrecords} new records to {file}, which had {oldlen}.')
        else:
            print(f'No update to {file}, which has {newlen} records.')

    end = time.time()
    duration = end - start
    print(f'Script ran for {duration}')

#     try:
#         write_to_gsheet()
#         print('Wrote to GSheets')
#     except:
#         print('Failed to write to GSheet')

##TESTING
test_event = {
  "key1": "value1",
  "key2": "value2",
  "key3": "value3"
}
event = []

lambda_handler(event, test_event)

Candidate lookup query failed.


UnboundLocalError: local variable 'cands' referenced before assignment

In [20]:
results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM individual_congressional_contributions').dataframe

subset = ['contribution_receipt_date', 'committee_id', 'transaction_id']

print(len(results))

results.drop_duplicates(subset=subset, inplace=True)

print(len(results))

225584
137005


In [12]:
print(len(results.drop_duplicates(subset=subset)))

136754


In [21]:
with dw.open_remote_file('darrenfishell/2020-election-repo', 'individual-congressional-contributions.csv') as w:
    r_df.to_csv(w, index=False)

KeyboardInterrupt: 

In [23]:
results = dw.query('darrenfishell/2020-election-repo'
                       , 'SELECT * FROM candidate_committee_lookup').dataframe

with dw.open_remote_file('darrenfishell/maine-federal-campaign-finance-tables', 'candidate_committee_lookup.csv') as w:
    results.to_csv(w, index=False)

In [18]:
try:
    write_to_gsheet()
    print('Wrote to GSheets')
except:
    print('Failed to write to GSheet')

Wrote to GSheets
