In [53]:
import os
from datetime import datetime, timedelta
from parsons import VAN, Redshift, Table
from canalespy import logger, setup_environment

In [54]:
setup_environment()
rs = Redshift()
van = VAN(api_key='f49f4c62-439c-362d-eae2-e6b32ccb8431', db='EveryAction')

In [55]:
LOG_TABLE = 'txop_newmode.calls_to_ea_log'
SOURCE_TABLE = 'txop_newmode.call_summary_to_ea'
EA_EVENT_SUMMARY = 'txop_newmode.ea_event_summary'
TIMESTAMP = datetime.now() # timestamp to be used for logtable
SURVEY_QUESTION_ID = 163740 # issue campaign
CONTACT_TYPE_ID = 1 # phone
ROLE_ID = 100541 # talked to politician
STATUS_ID = 2 # complete
LOCATION_ID = 107 # DA only

In [56]:
REDSHIFT_USERNAME = os.environ['REDSHIFT_USERNAME']
REDSHIFT_PASSWORD = os.environ['REDSHIFT_PASSWORD']
REDSHIFT_HOST = os.environ['REDSHIFT_HOST']
REDSHIFT_DB = os.environ['REDSHIFT_DB']
REDSHIFT_PORT = os.environ['REDSHIFT_PORT']
S3_TEMP_BUCKET= os.environ['S3_TEMP_BUCKET']

In [10]:
# Setup logging
if not rs.table_exists(LOG_TABLE): # if log table does not already exist, create it
    # SQL to create the log table
    sql = "(outreach_id int, vanid varchar(255), date_created timestamp, sr_load varchar(255), signup_id varchar(255));"
    rs.query(f"create table {LOG_TABLE} {sql}")
    logger.info(f"Creating {LOG_TABLE}...")

    # Grab the source data
    # DA events occur once a week on wednesdays, so we need to convert action dates to the nearest event date for that week to do the final join 
source_tbl = rs.query(f'''
with base as (
  SELECT a.*
  FROM (SELECT *, row_number() OVER(partition by outreach_id order by action_date desc) as dup
        FROM {SOURCE_TABLE}) a
  LEFT JOIN {LOG_TABLE} l ON a.outreach_id = l.outreach_id
  WHERE l.outreach_id is NULL
  AND dup=1
)

, da_event_date_adjust as (
  SELECT *,
    case
      when datepart(dow, action_date::datetime) = 7 then dateadd(day, 3, action_date::date)::varchar
      when datepart(dow, action_date::datetime) = 1 then dateadd(day, 2, action_date::date)::varchar
      when datepart(dow, action_date::datetime) = 2 then dateadd(day, 1, action_date::date)::varchar
      when datepart(dow, action_date::datetime) = 4 then dateadd(day, -1, action_date::date)::varchar
      when datepart(dow, action_date::datetime) = 5 then dateadd(day, -2, action_date::date)::varchar
      when datepart(dow, action_date::datetime) = 6 then dateadd(day, -3, action_date::date)::varchar
      else action_date end as da_date_adjust
  FROM base
  WHERE event_type ilike 'da'
  AND day_rank = 1
)

, final as (
    SELECT a.*, b.da_date_adjust
    FROM base a
    LEFT JOIN da_event_date_adjust b USING(outreach_id)
)

SELECT a.*
      , e.eventid
      , e.eventshiftid
FROM final a
LEFT JOIN {EA_EVENT_SUMMARY} e ON e.dateoffsetbegin::date = a.action_date::date AND e.eventcalendarid = a.event_calendar_id
WHERE da_date_adjust is NULL

UNION

SELECT a.*
      , e.eventid
      , e.eventshiftid
FROM final a
LEFT JOIN {EA_EVENT_SUMMARY} e ON e.dateoffsetbegin::date = a.da_date_adjust::date AND e.eventcalendarid = a.event_calendar_id 
WHERE da_date_adjust is NOT NULL
'''
)

logger.info(f"{source_tbl.num_rows} rows found.")

INFO 0 rows found.


In [11]:
# If there are new records, create a list with the column names for our eventual Parsons Table of what we've loaded
if source_tbl.num_rows > 0:
    logger.info(f"{source_tbl.num_rows} new records found. Inserting into table")
    loaded = [['outreach_id', 'vanid', 'datecreated','sr_load', 'signup_id']]

    # Handle all of the date conversions we will need -- 2-4 are only needed as a backup if we need to use the events API:

    # 1. Convert action date to iso format, and add it to a new column
    source_tbl.add_column(
      'action_date_iso',
      lambda row: datetime.strptime(row['action_date'], '%Y-%m-%d %H:%M:%S').isoformat()
    )

    # 2. Convert action date to datetime date, and add it to a new column. Uses converted DA event date if available. 
    source_tbl.add_column(
      'action_date_date',
      lambda row: datetime.strptime(row['da_date_adjust'], '%Y-%m-%d %H:%M:%S').date() if row['da_date_adjust'] else datetime.strptime(row['action_date'], '%Y-%m-%d %H:%M:%S').date()
    )

    # 3. Substract one day from the action date, convert the output to a string, and add it to a new column
    source_tbl.add_column(
      'action_date_after',
      lambda row: str(row['action_date_date'] - timedelta(days=1))
    )

    # 4. Add/substract one day from the action date, convert the output to a string, and add it to a new column
    source_tbl.add_column(
      'action_date_before',
      lambda row: str(row['action_date_date'] + timedelta(days=1))
    )

In [12]:
source_tbl

outreach_id,action_date,first_name,last_name,email,phone,zip,parent_url,survey_response_id,event_type,event_calendar_id,call_status,call_hangup_reason,day_rank,dup,da_date_adjust,eventid,eventshiftid


In [16]:
for ppl in source_tbl:
      logger.info(f"Working on outreach_id={ppl['outreach_id']}")

      # Find contacts in EveryAction based on their contact information in new/mode
      try:
        ea = van.find_person(first_name=ppl['first_name']
                            ,last_name=ppl['last_name']
                            ,email=ppl['email']
                            ,phone=ppl['phone']
                            ,zip=ppl['zip'])
      except Exception as e:
        logger.info(f"Issue finding in VAN! {str(e)}")
        ea = e

      logger.info(f"Found in EveryAction as {ea['vanId']}...")

      # Apply Survey Response
      try:
        sr = van.apply_survey_response(id=ea['vanId'],
                              id_type='vanid',
                              survey_question_id=SURVEY_QUESTION_ID,
                              survey_response_id=ppl['survey_response_id'],
                              contact_type_id=CONTACT_TYPE_ID, 
                              date_canvassed=ppl['action_date_iso'])
      except Exception as e:
        logger.info(f"Couldn't apply survey response")
        sr = e
        
      # Add  Event Sign Up
      signup_id = None

      # If the event exists in our summary table:
      if ppl['day_rank'] == 1:

        if ppl['eventid'] is None:
          # Find the recurring event that occurs on the day the new/mode action took place and store the event_id as a variable
          find_event = van.get_events(event_type_ids=ppl['event_calendar_id'],starting_after=ppl['action_date_after'], starting_before=ppl['action_date_before'])              
          event_id = find_event['eventId'][0]

          # Find the shift id for our event
          shifts = find_event['shifts'][0][0]
          shift_id = shifts['eventShiftId']
        else:
          event_id = ppl['eventid']
          shift_id = ppl['eventshiftid']
            
        if ppl['event_type'] == 'DA':
          location_id = 107
        else:
          location_id = None
      
        try: 
          signup_id = van.create_signup(vanid=ea['vanId'],
                                    event_id=event_id, 
                                    shift_id=shift_id, 
                                    role_id = ROLE_ID, 
                                    status_id = STATUS_ID,
                                    location_id = location_id)
        except Exception as e:
          logger.info(f"Couldn't add event signup!")
          signup_id = e

      # Create a list of key information for log table
      vanids = [ppl['outreach_id'], str(ea['vanId']),TIMESTAMP,str(sr),str(signup_id)]
    
      # Append list to log table
      loaded.append(vanids)

logger.info(f"Finished the loop! Now to load the work we've done into {LOG_TABLE}")
Table(loaded).to_redshift(LOG_TABLE,if_exists='append', )

INFO Working on outreach_id=10656058
people INFO Finding David German.
INFO Found in EveryAction as 102022502...
survey_questions INFO Applying survey question 163740 to vanid 102022502
signups INFO Signup {r} created.
INFO Working on outreach_id=10652185
people INFO Finding Tracy Scott.
INFO Found in EveryAction as 102022491...
survey_questions INFO Applying survey question 163740 to vanid 102022491
signups INFO Signup {r} created.
INFO Finished the loop! Now to load the work we've done into txop_newmode.calls_to_ea_log
redshift INFO Data copied to txop_newmode.calls_to_ea_log.


In [26]:
for ppl in source_tbl:
      logger.info(f"Working on outreach_id={ppl['outreach_id']}")

      # Find contacts in EveryAction based on their contact information in new/mode
      try:
        ea = van.find_person(first_name=ppl['first_name']
                            ,last_name=ppl['last_name']
                            ,email=ppl['email']
                            ,phone=ppl['phone']
                            ,zip=ppl['zip'])
      except Exception as e:
        logger.info(f"Issue finding in VAN! {str(e)}")
        ea = e

      logger.info(f"Found in EveryAction as {ea['vanId']}...")

      # Apply Survey Response
      try:
        sr = van.apply_survey_response(id=ea['vanId'],
                              id_type='vanid',
                              survey_question_id=SURVEY_QUESTION_ID,
                              survey_response_id=ppl['survey_response_id'],
                              contact_type_id=CONTACT_TYPE_ID, 
                              date_canvassed=ppl['action_date_iso'])
      except Exception as e:
        logger.info(f"Couldn't apply survey response")
        sr = e
      print(response)

INFO Working on outreach_id=10656058
people INFO Finding David German.
INFO Found in EveryAction as 102022502...
survey_questions INFO Applying survey question 163740 to vanid 102022502


NameError: name 'response' is not defined

In [49]:
for ppl in source_tbl:
      logger.info(f"Working on outreach_id={ppl['outreach_id']}")

      # Find contacts in EveryAction based on their contact information in new/mode
      try:
        ea = van.find_person(first_name=ppl['first_name']
                            ,last_name=ppl['last_name']
                            ,email=ppl['email']
                            ,phone=ppl['phone']
                            ,zip=ppl['zip'])
        ea_id = str(ea['vanId'])
        logger.info(f"Found in EveryAction as {ea_id}...")
      except Exception as e:
        logger.info(f"Issue finding in VAN! {str(e)}")
        ea_id = str(e)

In [67]:
# Find contacts in EveryAction based on their contact information in new/mode
try:
    ea = van.find_person(first_name='Carmela'
                    ,last_name='Thomas'
                    ,email='carmelathomas69@gmail.com'
                    ,phone='6822050577'
                    ,zip='76133')
    ea_id = str(ea['vanId'])
    logger.info(f"Found in EveryAction as {ea_id}...")
except Exception as e:
    logger.info(f"Issue finding in VAN! {str(e)}")
    ea_id = e



people INFO Finding Carmela Thomas.
INFO:parsons.ngpvan.people:Finding Carmela Thomas.
INFO Issue finding in VAN! HTTP error occurred (404): Not Found, json: {'vanId': None, 'status': 'Unmatched'}
INFO:canalespy:Issue finding in VAN! HTTP error occurred (404): Not Found, json: {'vanId': None, 'status': 'Unmatched'}


NameError: name 'ea' is not defined

In [None]:
for ppl in source_tbl:
      logger.info(f"Working on outreach_id={ppl['outreach_id']}")

      # Find contacts in EveryAction based on their contact information in new/mode
      try:
        ea = van.find_person(first_name=ppl['first_name']
                            ,last_name=ppl['last_name']
                            ,email=ppl['email']
                            ,phone=ppl['phone']
                            ,zip=ppl['zip'])
        print(ea)
#       except Exception as e:
#         logger.info(f"Issue finding in VAN! {str(e)}")
#         vanids = [ppl['outreach_id'],str(e),TIMESTAMP,NULL,NULL]

#       logger.info(f"Found in EveryAction as {ea['vanId']}...")

#       # Apply Survey Response
#       try:
#         sr = van.apply_survey_response(id=ea['vanId'],
#                               id_type='vanid',
#                               survey_question_id=SURVEY_QUESTION_ID,
#                               survey_response_id=ppl['survey_response_id'],
#                               contact_type_id=CONTACT_TYPE_ID, 
#                               date_canvassed=ppl['action_date_iso'])
#       except Exception as e:
#         logger.info(f"Couldn't apply survey response")
#         vanids = [ppl['outreach_id'],ea['vanId'], TIMESTAMP, str(e),NULL]

#       # Add  Event Sign Up
#       signup_id = None

#       # If the event exists in our summary table:
#       if ppl['day_rank'] == 1:

#         if ppl['eventid'] is None:
#           api = True
#           # Find the recurring event that occurs on the day the new/mode action took place and store the event_id as a variable
#           find_event = van.get_events(event_type_ids=ppl['event_calendar_id'],starting_after=ppl['action_date_after'], starting_before=ppl['action_date_before'])              
#           event_id = find_event['eventId'][0]

#           # Find the shift id for our event
#           shifts = find_event['shifts'][0][0]
#           shift_id = shifts['eventShiftId']
#         else:
#           api = False

#         try: 
#           signup_id = van.create_signup(vanid=ea['vanId'],
#                                     event_id=event_id if api == True else ppl['eventid'], 
#                                     shift_id=shift_id if api == True else ppl['eventshiftid'], 
#                                     role_id = ROLE_ID, 
#                                     status_id = STATUS_ID,
#                                     location_id = None)
#         except Exception as e:
#           logger.info(f"Couldn't add event signup!")
#           vanids = [ppl['outreach_id'],ea['vanId'], TIMESTAMP, sr, str(e)]

#       # Create a list of key information for log table
#       vanids = [ppl['outreach_id'], ea['vanId'],TIMESTAMP,sr,signup_id]

#       # Append list to log table
#       loaded.append(vanids)

# logger.info(f"Finished the loop! Now to load the work we've done into {LOG_TABLE}")
# Table(loaded).to_redshift(LOG_TABLE,if_exists='append')

In [34]:
vanids

[10652185,
 102022491,
 datetime.datetime(2021, 4, 28, 14, 16, 45, 267251),
 None,
 requests.exceptions.HTTPError('HTTP error occurred (400): Bad Request, json: {\'errors\': [{\'code\': \'INVALID_PARAMETER\', \'text\': "\'location\' is required by the specified Event", \'properties\': [\'location\']}]}')]

In [12]:
source_tbl.select_rows(lambda row: row.da_date_adjust)

outreach_id,action_date,first_name,last_name,email,phone,zip,parent_url,survey_response_id,event_type,event_calendar_id,call_status,call_hangup_reason,day_rank,dup,da_date_adjust,eventid,eventshiftid
10490213,2021-04-16 00:13:17,Boyun,Sul,boyunsul0@gmail.com,2148625806,75080,https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?preview=true,1845072,DA,164850,completed,Call ended successfully,1,1,2021-04-14 00:00:00,750017860,34173
10605773,2021-04-22 23:00:30,Patrick,Davis,iamisomni@gmail.com,7134223829,77026,https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?sp_ref=709213383.392.216992.f.0.2,1845072,DA,164850,completed,Call ended successfully,1,1,2021-04-21 00:00:00,750017861,34174
10488558,2021-04-15 21:07:30,Devin,Branch,dbranch@organizetexas.org,8327052291,77004,https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?preview=true,1845072,DA,164850,completed,Call ended successfully,1,1,2021-04-14 00:00:00,750017860,34173
10489663,2021-04-15 23:14:28,Esther,Tijerina,esttijerina78202@gmail.com,2102144902,78202,https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?preview=true,1845072,DA,164850,completed,Call ended successfully,1,1,2021-04-14 00:00:00,750017860,34173


In [11]:
source_tbl.select_rows(lambda row: row.eventid is None)

outreach_id,action_date,first_name,last_name,email,phone,zip,parent_url,survey_response_id,event_type,event_calendar_id,call_status,call_hangup_reason,day_rank,dup,da_date_adjust,eventid,eventshiftid
10489692,2021-04-15 23:17:30,Esther,Tijerina,esttijerina78202@gmail.com,2102144902,78202,https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?preview=true,1845072,DA,164850,completed,Call ended successfully,2,1,,,


In [14]:
# 2. Convert action date to datetime date, and add it to a new column
source_tbl.add_column(
  'action_date_date',
  lambda row: datetime.strptime(row['da_date_adjust'], '%Y-%m-%d %H:%M:%S').date() if row['da_date_adjust'] else datetime.strptime(row['action_date'], '%Y-%m-%d %H:%M:%S').date()
)

outreach_id,action_date,first_name,last_name,email,phone,zip,parent_url,survey_response_id,event_type,event_calendar_id,call_status,call_hangup_reason,day_rank,dup,da_date_adjust,eventid,eventshiftid,action_date_date
10490213,2021-04-16 00:13:17,Boyun,Sul,boyunsul0@gmail.com,2148625806,75080,https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?preview=true,1845072,DA,164850,completed,Call ended successfully,1,1,2021-04-14 00:00:00,750017860,34173,2021-04-14
10486574,2021-04-15 17:59:17,Elizabeth,Gregory,elizabeth_gregory@earthlink.net,7132568310,77030,https://organizetexas.org/healthcare/call-speaker-phelan-now?preview=true&link_id=3&can_id=2c8e13b63ecd188ddb63b439abaa755c&source=email-action-alert-new-momentum-on-a-healthcare-expansion-bill-at-the-legislature-2&email_referrer=email_1142928&email_subject=action-alert-call-speaker-phelan-now,695590,MDW,128562,completed,Call ended successfully,1,1,,750015273,31580,2021-04-15
10470450,2021-04-14 19:45:09,James,Neal,james@jamesedwardneal.com,5129053931,76006,https://organizetexas.org/political_call-out-house-speaker-dade-phelan/,1845072,MDW,128562,completed,Call ended successfully,1,1,,750015272,31579,2021-04-14
10488023,2021-04-15 20:10:33,Holly,herzer,hlherz@aol.com,2147241670,78613,https://organizetexas.org/healthcare/call-speaker-phelan-now?preview=true&link_id=3&can_id=7f9a1a6dfca67e4fdb37a8b316b2eddf&source=email-action-alert-new-momentum-on-a-healthcare-expansion-bill-at-the-legislature-2&email_referrer=email_1142928&email_subject=action-alert-call-speaker-phelan-now,695590,MDW,128562,completed,Call ended successfully,1,1,,750015273,31580,2021-04-15
10486560,2021-04-15 17:57:59,Anita,Garcia,gar04317@yahoo.com,5128261371,78749,https://organizetexas.org/healthcare/call-speaker-phelan-now?preview=true&link_id=3&can_id=907fa04fcace94d5702469b052cd834c&source=email-action-alert-new-momentum-on-a-healthcare-expansion-bill-at-the-legislature-2&email_referrer=email_1142928&email_subject=action-alert-call-speaker-phelan-now,695590,MDW,128562,completed,Call ended successfully,1,1,,750015273,31580,2021-04-15


In [32]:
date_adjust = rs.query(f'''
select action_date::date, 
case 
	when datepart(dow, action_date::date) = 1 then dateadd(day, 2, action_date::date)
    when datepart(dow, action_date::date) = 2 then dateadd(day, 1, action_date::date)
    when datepart(dow, action_date::date) = 4 then dateadd(day, -1, action_date::date)
    when datepart(dow, action_date::date) = 5 then dateadd(day, -2, action_date::date)
    when datepart(dow, action_date::date) = 6 then dateadd(day, -3, action_date::date)
    when datepart(dow, action_date::date) = 7 then dateadd(day, 3, action_date::date)
    else action_date::date end as date_adjust
from txop_newmode.call_summary_to_ea
where event_type ilike 'da'
order by action_date::date
''')

In [33]:
date_adjust

action_date,date_adjust
2021-04-15,2021-04-14 00:00:00
2021-04-15,2021-04-14 00:00:00
2021-04-15,2021-04-14 00:00:00
2021-04-16,2021-04-14 00:00:00
2021-04-22,2021-04-21 00:00:00


In [29]:
for ppl in source_tbl:
    if ppl['day_rank'] == 1:

        if ppl['eventid'] is None:
            print(ppl)

{'outreach_id': 10489663, 'action_date': '2021-04-15 23:14:28', 'first_name': 'Esther', 'last_name': 'Tijerina', 'email': 'esttijerina78202@gmail.com', 'phone': '2102144902', 'zip': '78202', 'parent_url': 'https://organizetexas.org/direct/political/call-out-texas-house-calendars-committee-chair-rep.-dustin-burrows?preview=true', 'survey_response_id': 1845072, 'event_type': 'DA', 'event_calendar_id': '164850', 'call_status': 'completed', 'call_hangup_reason': 'Call ended successfully', 'day_rank': 1, 'dup': 1, 'eventid': None, 'eventshiftid': None, 'action_date_iso': '2021-04-15T23:14:28', 'action_date_date': datetime.date(2021, 4, 15), 'action_date_after': '2021-04-14', 'action_date_before': '2021-04-16'}
{'outreach_id': 10488558, 'action_date': '2021-04-15 21:07:30', 'first_name': 'Devin ', 'last_name': 'Branch ', 'email': 'dbranch@organizetexas.org', 'phone': '8327052291', 'zip': '77004', 'parent_url': 'https://organizetexas.org/direct/political/call-out-texas-house-calendars-committ

In [22]:
check_log = rs.query(f'''
select * from txop_newmode.calls_to_ea_log order by date_created desc limit 5
''')

In [23]:
check_log

outreach_id,vanid,date_created,sr_load,signup_id
10795009,"HTTP error occurred (404): Not Found, json: {'vanId': None, 'status': 'Unmatched'}",2021-05-10 19:18:42.949213,local variable 'ea' referenced before assignment,local variable 'ea' referenced before assignment
10774160,101558717,2021-05-07 15:23:09.021429,,122183
10775631,101553941,2021-05-07 15:23:09.021429,,122180
10774233,101532555,2021-05-07 15:23:09.021429,,
10785753,102023242,2021-05-07 15:23:09.021429,,122178


In [95]:
error_log_qa = rs.query(f'''
with base as (
    select
    outreach_id,
    date_created,
    case when len(vanid) >= 30 and vanid ilike '%Unmatched%' then 'Error - Unmatched'
         when vanid is null or len(vanid) >= 30 then 'Error' end as vanid_error,
    case when vanid_error ilike 'Err%' then NULL
         when sr_load is null or len(sr_load) >= 30 then 'Error' end as survey_response,   
    case when vanid_error ilike 'Err%' then NULL
         when signup_id is null or len(signup_id) >= 30 then 'Error' end as event_signup
    from txop_newmode.calls_to_ea_log
    )
select * from base
    where vanid_error ilike 'Err%'
    or survey_response ilike 'Error'
    or event_signup ilike 'Error'
''')

In [96]:
error_log_qa

outreach_id,date_created,vanid_error,survey_response,event_signup
10795009,2021-05-10 19:18:42.949213,Error - Unmatched,,


In [26]:
rs.query(f"select len(vanid) from txop_newmode.calls_to_ea_log order by date_created desc limit 1")

len
82
