In [3]:
#re-imports modules that have changed
%load_ext autoreload
%autoreload 2

In [4]:
import sys
sys.path.append("../")

In [5]:
import os
import logging
import json
import datetime as dt
import time
import pandas as pd
from google.cloud import pubsub_v1

from src.voter_info_fetcher import VoterInfo
from src.utils_cloud_storage import CloudStorageClient

In [6]:
logging.basicConfig(level="INFO")

In [7]:
# Constants

In [8]:
# retrieve county level address data and set fips code as the identifer 

In [9]:
def run_voter_info(): 
    
    """
    FULL PROCESS - NOTE THIS WILL FAIL 500 - SEE BELOW FOR UPDATED INDEPENDENT FUNCTIONS. 
    Cloud function to run job to fetch voter information data for current elections
    from Google Civic Information API.
    Retrieves county level data and sets the fips code as the identifier (`geo_id`)
    Stores data in a cloud storage bucket as a json file.
    """
    
    # Job status
    logging.info("Starting job fetch voter information")
    #logging.info("""Trigger: messageId {} published at {}""".format(context.event_id, context.timestamp))
    
    # Initiate job
    civic = VoterInfo() 
    
    # Load elections, address data
    logging.info("Load list of current elections.")
    elections = civic.load_current_elections("current_elections",  "current_elections.json")
    
    logging.info("Load addreses by locale")
    locales = civic.load_address_locales("address_locales",  "addresses_county.csv")
    
    # Retrieve Voter information data
    for election in elections[0:2]:
        election_id = election['id']
        election_name = election['name']
        election_ocdid = election['ocdDivisionId']
        
        # Get state abbr from OCDid
        election_ocdid = election_ocdid.split("/")[-1].split(":")[-1].upper()

        logging.debug(f"election_id: {election_id}")
        logging.debug(f"election_ocdid: {election_ocdid}")
        logging.debug(f"election_name: {election_name}")
        
        # Subset data by OCDid
        # Except test election 
        if election_name == 'VIP Test Election': 
            continue
        # If election is national, return data for all records
        elif election_ocdid == 'US':
            active = locales.copy()
        # If election is statewide, return data for all records in state. 
        else: 
            active = locales.loc[locales['state_abbr'] == election_ocdid, :].copy()
        # Ensure active elections not null
        try: 
            assert(active.empty == False)
        except Exception as e: 
            logging.error("Unable to subset data by OCDid.")
    
            # Get voter information for election
        try: 
            logging.info(f"Start election: {election_id}:{election_ocdid}") 
            for index, row in active.iterrows():
                address = row['address']
                geo_id = row["fips"]
                response = civic.fetch_voter_info(address, election_id)
                response['geoid'] = {"fips":geo_id}
                civic.save_voter_info(geo_id, response, bucket_name="current_contests")
                time.sleep(1)
            logging.debug(f"Completed election: {election_id}:{election_ocdid}")
        except Exception as error: 
            logging.error(f"Failed to retrieve data for {election_id}:{election_ocdid}")
            logging.error(error)

    
    # Job status
    logging.info("Completed job fetch voter info.")

In [11]:
#run_voter_info()

In [12]:
#len(active)

In [13]:
#election_id

In [14]:
#active.head(3)

In [15]:
# Initiate job
civic = VoterInfo() 
    
# Load elections, address data
logging.info("Load list of current elections.")
elections = civic.load_current_elections("current_elections",  "current_elections.json")

INFO:root:Load list of current elections.
INFO:root:Successfully loaded current elections data.


In [16]:
print(len(elections))
election = elections[1]
election

19


{'id': '4897',
 'name': 'South Carolina Presidential Primary',
 'electionDay': '2020-02-29',
 'ocdDivisionId': 'ocd-division/country:us/state:sc'}

In [17]:
def publish_active_elections():
    """
    Publishes elections to Pub/Sub topic with an error handler.
    Data included in attributes of messsage: 
            - election_id=election['id'],
            - name=election['name'], 
            - electionDay=election['electionDay'],
            - ocdDivisionId=election['ocdDivisionId']
    
    """
    def get_callback(f, data):
        def callback(f):
            try:
                logging.info(f.result())
                futures.pop(data)
            except:  # noqa
                logging.info("Please handle {} for {}.".format(f.exception(), data))

        return callback
    
    # Job status
    logging.info("Starting job to publish elections.")
    #logging.info("""Trigger: messageId {} published at {}""".format(context.event_id, context.timestamp))
    
    # Initiate job
    civic = VoterInfo()
    date = dt.datetime.now().date()
    
    # Load elections data
    logging.info("Load list of current elections.")
    elections = civic.load_current_elections("current_elections",  "current_elections.json")
    
    project_id = "election-tracker-268319"
    topic_name = "active-elections"

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    futures = dict()
    
    for election in elections: 
        # Set message attributes 
        
        data = str(election['id'])
        
        # When you publish a message, the client returns a future. Data must be a bytestring.
        futures.update({data: None})
        
        future = publisher.publish(
            topic_path, 
            data=data.encode("utf-8"),
            election_id=election['id'],
            name=election['name'], 
            electionDay=election['electionDay'],
            ocdDivisionId=election['ocdDivisionId']
        )
                                     
        futures[data] = future
        # Publish failures shall be handled in the callback function.
        future.add_done_callback(get_callback(future, data))

    # Wait for all the publish futures to resolve before exiting.
    while futures:
        time.sleep(5)

    print(f"Published active elections for current elections as of {str(date)}")
    

In [18]:
publish_active_elections()

INFO:root:Starting job to publish elections.
INFO:root:Load list of current elections.
INFO:root:Successfully loaded current elections data.
INFO:root:443459523184583
INFO:root:443459523184584
INFO:root:443459523184585
INFO:root:443459523184586
INFO:root:443459523184587
INFO:root:443459523184588
INFO:root:443459523184589
INFO:root:443459523184590
INFO:root:443459523184591
INFO:root:443459523184592
INFO:root:443459523184593
INFO:root:443459523184594
INFO:root:443459523184595
INFO:root:443459523184596
INFO:root:443459523184597
INFO:root:443459523184598
INFO:root:443459523184599
INFO:root:443459523184600
INFO:root:443459523184601


Published active elections for current elections as of 2020-02-24


In [62]:
def publish_active_divisions(event, context):
    """
    Publishes parsed election data by division to a Pub/Sub topic with an error handler.
    
    For each division associated with an election, message includes: 
        - election_id=election_id, # As returned by Civic Information API 
        - address=address, # Address of geo division associated with election parsed from locales data.
        - geo_id=geo_id # Fips code or similar geodivision identifier as parsed from locales data
        
    """
    
    def get_callback(f, data):
        def callback(f):
            try:
                logging.info(f.result())
                futures.pop(data)
            except:  # noqa
                logging.info("Please handle {} for {}.".format(f.exception(), data))

        return callback
    
    # Job status
    logging.info("Starting job to parse election.")
    logging.info("""Trigger: messageId {} published at {}""".format(context.event_id, context.timestamp))
    
    # Initiate job
    civic = VoterInfo() 
    
    logging.info("Load addreses by locale")
    locales = civic.load_address_locales("address_locales",  "addresses_county.csv")
    
    project_id = "election-tracker-268319"
    topic_name = "active-divisions"

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    futures = dict()
    
    # Parse election 
    election = event['attributes']
    election_id = election['election_id'] #renamed to avoid conflict
    election_name = election['name']
    election_ocdid = election['ocdDivisionId']

    # Get state abbr from OCDid
    election_ocdid = election_ocdid.split("/")[-1].split(":")[-1].upper()

    logging.debug(f"election_id: {election_id}")
    logging.debug(f"election_ocdid: {election_ocdid}")
    logging.debug(f"election_name: {election_name}")

    # Subset data by OCDid
    # Except test election 
    if election_name == 'VIP Test Election':
        logging.debug("Election name 'VIP Test Election' excluded.")
        return
    # If election is national, return data for all records
    elif election_ocdid == 'US':
        active = locales.copy()
    # If election is statewide, return data for all records in state. 
    else: 
        active = locales.loc[locales['state_abbr'] == election_ocdid, :].copy()
    # Ensure active elections not null
    try: 
        assert(active.empty == False)
    except Exception as e: 
        logging.error("Unable to subset data by OCDid.")
        raise

    # publish active division
    for index, row in active.iterrows():
        data = str(row["fips"])
        
        futures.update({data: None})

        # When you publish a message, the client returns a future. Data must be a bytestring.
        future = publisher.publish(
            topic_path, 
            data=data.encode("utf-8"),
            election_id=election_id, 
            address=row['address'], 
            geo_id=str(row["fips"])
        )
        futures[data] = future
        # Publish failures shall be handled in the callback function.
        future.add_done_callback(get_callback(future, data))

    # Wait for all the publish futures to resolve before exiting.
    while futures:
        time.sleep(5)

    logging.info(f"Published active divisions for election {election_id}")

In [63]:
from google.cloud import pubsub
subscriber = pubsub.SubscriberClient()
# Substitute PROJECT, SUBSCRIPTION, and TOPIC with appropriate values for
# your application.


sub_path = subscriber.subscription_path("election-tracker-268319", 'test_elections')
topic_path = subscriber.topic_path("election-tracker-268319", 'active-elections')
#subscriber.create_subscription(sub_path, topic_path)

In [64]:
# Substitute PROJECT and SUBSCRIPTION with appropriate values for your
# application.
subscription_path = subscriber.subscription_path("election-tracker-268319", 'test_elections')
response = subscriber.pull(subscription_path, max_messages=5)

for msg in response.received_messages:
    print("Received message:", msg.message.data)

ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)

Received message: b'4900'
Received message: b'4901'
Received message: b'4904'
Received message: b'4907'
Received message: b'4908'


In [65]:
class Context(): 
    def __init__(self):
        self.event_id = '1234'
        self.timestamp = 'somedatetime'

In [66]:
context = Context()
context.event_id

event = {}
event['data'] = 'some message'
event['attributes'] = msg.message.attributes

In [71]:
if 'attributes' in event: 
    print(True)

True


In [70]:
logging.info(f"{str(event)}")

INFO:root:{'data': 'some message', 'attributes': {'electionDay': '2020-03-03', 'name': 'Oklahoma Presidential Primary Election', 'ocdDivisionId': 'ocd-division/country:us/state:ok', 'election_id': '4908'}}


In [67]:
publish_active_divisions(event, context)

INFO:root:Starting job to parse election.
INFO:root:Trigger: messageId 1234 published at somedatetime
INFO:root:Load addreses by locale
INFO:root:Successfully loaded address lookup data.
INFO:root:443460995406875
INFO:root:443460995406876
INFO:root:443461061823569
INFO:root:443460995406877
INFO:root:443461061823570
INFO:root:443460995406878
INFO:root:443461061823571
INFO:root:443460995406879
INFO:root:443461061823572
INFO:root:443460995406880
INFO:root:443461061823573
INFO:root:443460995406881
INFO:root:443461061823574
INFO:root:443460995406882
INFO:root:443461061823575
INFO:root:443460995406883
INFO:root:443461061823576
INFO:root:443460995406884
INFO:root:443461061823577
INFO:root:443461061823578
INFO:root:443460995406885
INFO:root:443461061823579
INFO:root:443460995406886
INFO:root:443461061823580
INFO:root:443461061823581
INFO:root:443461061823582
INFO:root:443461061823583
INFO:root:443461061823584
INFO:root:443461061823585
INFO:root:443461061823586
INFO:root:443461061823587
INFO:ro

In [50]:
# Substitute PROJECT and SUBSCRIPTION with appropriate values for your
# application.
subscription_path = subscriber.subscription_path("election-tracker-268319", 'test_divisions')
response = subscriber.pull(subscription_path, max_messages=5)

for msg in response.received_messages:
    print("Received message:", msg.message.data)

ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)

Received message: b'08011'
Received message: b'08013'
Received message: b'08014'
Received message: b'08015'
Received message: b'08017'


In [51]:
context = Context()
context.event_id

event = Event(msg.message.attributes)
event.data

{'address': 'Co Rd 31, Cheyenne Wells, CO 80810, USA', 'geo_id': '08017', 'election_id': ''}

In [98]:
# Def fetch election information
def run_voter_info(event, context): 
    """
    Retrieves voter information from Google Civic API
    Takes: 
        Data returned from the active-divisons topic message: 
        - election_id=election_id, # As returned by Civic Information API 
        - address=address, # Address of geo division associated with election parsed from locales data.
        - geo_id=geo_id # Fips code or similar geodivision identifier as parsed from locales data
    Makes the API Call 
    Saves the data to Google Cloud Storage
    """
    
    # Job status
    logging.info("Starting job to fetch voter information.")
    logging.info("""Trigger: messageId {} published at {}""".format(context.event_id, context.timestamp))
    
    division = event.data
    address = division['address']
    geo_id = division['geo_id']
    election_id = division['election_id']
    
    civic = VoterInfo() 
    
    try: 
        logging.info(f"Start VoterInfo call: {election_id}:{geo_id}") 
        response = civic.fetch_voter_info(address, election_id)
        response['geoid'] = {"fips":geo_id}
        civic.save_voter_info(geo_id, response, bucket_name="current_contests")
        time.sleep(1)
        logging.debug(f"Completed VoterInfo call.")
    except Exception as error: 
        logging.error(f"Failed to retrieve data for {election_id}:{geo_id}")
        logging.error(error)

In [99]:
run_voter_info(division)

INFO:root:Start VoterInfo call: 4939:44009
ERROR:root:Successfully saved data for 44009 to: gs://current_contests/44009_2020-02-21.json
