#### LATAM QMS : Fast Track Module
##### Run quality / fraud queries on presto and tag / note courier partners
###### Runs 2 times a day (0900, 1600 hrs MEXICO TIME) translates into 1500, 2300 hrs UTC

In [1]:
# Eats LATAM QMS : Fast Track
# @ bhama@uber.com, shane.macnamara@uber.com

# authorize Fast Track Actions Control Center (global)

import pygsheets
import unidecode

root_dir = '/home/udocker/brunoa/Fast-Track'
path_to_google_json = os.path.join(root_dir,'client_secret.json')
gc = pygsheets.authorize(outh_file=path_to_google_json, outh_nonlocal=True)
# gsheet_name = 'LatAm Fast-Track - Actions Control Center'
gsheet_key = '1FvqZ0mmLtaTmJ1xOUFHCUU3F1nPLkMofand0FK6TFPk'
# gs = gc.open(gsheet_name)
gs = gc.open_by_key(gsheet_key)

import pandas as pd

notification_tracking_tag = 'latam_fraud_fasttrack_soft_tag'
waitlist_tracking_tag = 'latam_fraud_fasttrack_wl_queue'

# initializing populous services
from tchannel import thrift 
from tchannel.sync import TChannel
import ujson 
with open("/etc/uber/hyperbahn/hosts.json") as f:
    known_peers = ujson.load(f)
global tchannel 
tchannel = TChannel(name="tcurl", known_peers=known_peers)
global populous_service
populous_service= thrift.load(path="/home/udocker/brunoa/Fast-Track/populous.thrift", service="populous")

In [2]:
def encode_df(element):
    try:
        if (element.isspace() or element == ''):
            return ''
        if isinstance(element, basestring):
            try :
                return unidecode.unidecode(element)
            except UnicodeDecodeError:
                return element.decode('latin1')
    except AttributeError:
        return element

In [3]:
# check if fast track is switched off
def is_fast_track():
    
    ws_name = 'locked_actions'

    # open worksheet and load settings
    ws = gs.worksheet_by_title(ws_name)
    t1 = ws.cell('C1').value
    
    if t1 == 'Run Fast Track':
        return 'Yes'
    else:
        return 'No'

In [4]:
# local function to call saved queries from queryrunner_client 
def saved_query(query,database,datestr,rules,city_ids):
    from queryrunner_client import Client
    q = Client(user_email = 'brunoa@uber.com')
    param = {'rule_list': rules, 'datestr': datestr, 'city_ids': str(city_ids)}
    e = q.execute_report(query, parameters = param)
    d = e.fetchall()
    return d

In [5]:
# query runner engine for building parameters and executing query
def qb_engine(df,query,database,datestr):
    
    try:
        # exception : No cities enabled for fast track, exit
        if (df.shape[0] <> 0):

            # initiate query parameters for query_1
            rules = "select 0 as city_id,'rule_name' as rule_name, 0 as threshold, 0 as lookback_period"
            city_ids = ''

            # exception : Only one city enabled in fast track 
            if (df['City ID'].unique().shape[0] == 1):

                # create city_ids and rules for one city 
                city_ids = str(df['City ID'].unique()[0])
                for index,row in df.iterrows():
                    rules += " union select "+str(row['City ID'])+ ", '"+str(row['Rule'])+"', "+str(row['Threshold'])+", "+str(row['Lookback'])

                rd = saved_query(query,database,datestr,rules,city_ids)

            else:

                # create city_ids and rules for multiple cities
                city_ids = ','.join(map(str,df['City ID'].unique()))
                for index,row in df.iterrows():
                    rules += " union select "+str(row['City ID'])+ ", '"+str(row['Rule'])+"', "+str(row['Threshold'])+", "+str(row['Lookback'])

                rd = saved_query(query,database,datestr,rules,city_ids)
                
    except:
        rd = 'Failed Run'
    
    return rd

In [6]:
# cool off and query runner engine for building parameters and executing cool off logic
def co_engine(df,query,database,datestr):
    
    try:  
        # separating the dataframe by rule group " notification"
        df_n = df[df['Rule'].str.contains("notification")]

        # build the cool down parameters
        df_c = pd.DataFrame(columns=['City ID','Lookback'])

        for ctr in range(0,len(df_n['City ID'].unique())):
            df_c = df_c.append({'City ID':str(df_n['City ID'].unique()[ctr]),'Lookback':str(df_n[df_n['City ID'] == df_n['City ID'].unique()[ctr]]['Lookback'].max())},ignore_index=True)

        # Cooloff logic to exclude the couriers 
        # from the actioning process 

        if (df_c.shape[0] <> 0):

            # initiate query parameters for cooldown
            rules = "select 0 as city_id, 0 as lookback_period"
            city_ids = ''

            # exception : Only one city enabled in fast track 
            if (df_c.shape[0] == 1):

                # create city_ids and rules for one city 
                city_ids = str(df_c['City ID'][0])
                for index,row in df_c.iterrows():
                    rules += " union select "+str(row['City ID'])+ ", "+str(row['Lookback'])

                rd = saved_query(query,database,datestr,rules,city_ids)

            else:

                # create city_ids and rules for multiple cities
                city_ids = ','.join(map(str,df_c['City ID'].unique()))
                for index,row in df_c.iterrows():
                    rules += " union select "+str(row['City ID'])+ ", "+str(row['Lookback'])

                rd = saved_query(query,database,datestr,rules,city_ids)
    except:
        rd = 'Failed Run'
    
    return rd

In [7]:
# load city / rule settings for fast track
def load_city_settings():
    
    ws_name = 'fast_track'

    # open gsheet
    ws = gs.worksheet_by_title(ws_name)
    rd_set = ws.get_all_records()

    # make data frame for manipulation
    df_set = pd.DataFrame(rd_set)

    # extract shadow cities into a new data frame
    df_set_s = df_set[df_set['Active'] == 'Shadow']

    # keep settings for enabled rules only 
    df_set = df_set[df_set['Active'] == 'Enable']
    
    return df_set, df_set_s

In [8]:
# write to fast track courier log 
# def courier_log(df_final,gsheet_name):
def courier_log(df_final,gsheet_key):
    # creating log 
    df_final = df_final.applymap(encode_df)
    # open gsheet
    gs = gc.open_by_key(gsheet_key)
#     gs = gc.open(gsheet_name)

    if run_cycle == 1:
        # pop old data and push new data
        ws = gs.worksheet_by_title('today_5')
        df_temp = ws.get_as_df(has_header=True)
        ws = gs.worksheet_by_title('today_6')
        ws.clear()
        df_temp = df_temp.applymap(encode_df)
        ws.set_dataframe(df_temp,(1,1),copy_index=False, copy_head=True, fit=True)

        ws = gs.worksheet_by_title('today_4')
        df_temp = ws.get_as_df(has_header=True)
        ws = gs.worksheet_by_title('today_5')
        ws.clear()
        df_temp = df_temp.applymap(encode_df)
        ws.set_dataframe(df_temp,(1,1),copy_index=False, copy_head=True, fit=True)

        ws = gs.worksheet_by_title('today_3')
        df_temp = ws.get_as_df(has_header=True)
        ws = gs.worksheet_by_title('today_4')
        ws.clear()
        df_temp = df_temp.applymap(encode_df)
        ws.set_dataframe(df_temp,(1,1),copy_index=False, copy_head=True, fit=True)

        ws = gs.worksheet_by_title('today_2')
        df_temp = ws.get_as_df(has_header=True)
        ws = gs.worksheet_by_title('today_3')
        ws.clear()
        df_temp = df_temp.applymap(encode_df)
        ws.set_dataframe(df_temp,(1,1),copy_index=False, copy_head=True, fit=True)

        ws = gs.worksheet_by_title('today_1')
        df_temp = ws.get_as_df(has_header=True)
        ws = gs.worksheet_by_title('today_2')
        ws.clear()
        df_temp = df_temp.applymap(encode_df)
        ws.set_dataframe(df_temp,(1,1),copy_index=False, copy_head=True, fit=True)

        ws = gs.worksheet_by_title('today')
        df_temp = ws.get_as_df(has_header=True)
        ws = gs.worksheet_by_title('today_1')
        ws.clear()
        df_temp = df_temp.applymap(encode_df)
        ws.set_dataframe(df_temp,(1,1),copy_index=False, copy_head=True, fit=True)

        ws = gs.worksheet_by_title('today')
        ws.clear()
        ws.set_dataframe(df_final,(1,1),copy_index=False, copy_head=True, fit=True)

    else: 
        ws = gs.worksheet_by_title('today')
        df_temp = ws.get_as_df(has_header=True)
        ws.set_dataframe(df_final,(df_temp.shape[0]+2,1),copy_index=False, copy_head=True, fit=True)

    # End of code

In [9]:
# Build data frames by group query rules for enabled and shadow mode rules/cities
def rule_engine(df, df_s):
    
    # separate out the settings basis rule-query combination
    df_1 = df[(df['Rule'] == 'Fraud trips (notification)') | (df['Rule'] == 'Fraud trips (immediate WL)')]
    df_1 = df_1.append(df[(df['Rule'] == 'Failed trips (notification)') | (df['Rule'] == 'Failed trips (immediate WL)')])
    df_1 = df_1.append(df[(df['Rule'] == 'NRO trips (notification)') | (df['Rule'] == 'NRO trips (immediate WL)')])

    df_2 = df[(df['Rule'] == 'Cancels at Restaurant (notification)') | (df['Rule'] == 'Cancels at Restaurant (immediate WL)')]

    df_3 = df[(df['Rule'] == 'Full Day Failures (notification)') | (df['Rule'] == 'Full Day Failures (immediate WL)')]

    df_4 = df[(df['Rule'] == 'Failure at same location (notification)') | (df['Rule'] == 'Failure at same location (immediate WL)')]
    df_4 = df_4.append(df[(df['Rule'] == 'No movement (notification)') | (df['Rule'] == 'No movement (immediate WL)')])    

    df_5 = df[(df['Rule'] == 'Missing Items (notification)') | (df['Rule'] == 'Missing Items (immediate WL)')]
    
    # separate out the settings basis rule-query combination (for shadow mode)
    df_s_1 = df_s[(df_s['Rule'] == 'Fraud trips (notification)') | (df_s['Rule'] == 'Fraud trips (immediate WL)')]
    df_s_1 = df_s_1.append(df_s[(df_s['Rule'] == 'Failed trips (notification)') | (df_s['Rule'] == 'Failed trips (immediate WL)')])
    df_s_1 = df_s_1.append(df_s[(df_s['Rule'] == 'NRO trips (notification)') | (df_s['Rule'] == 'NRO trips (immediate WL)')])

    df_s_2 = df_s[(df_s['Rule'] == 'Cancels at Restaurant (notification)') | (df_s['Rule'] == 'Cancels at Restaurant (immediate WL)')]

    df_s_3 = df_s[(df_s['Rule'] == 'Full Day Failures (notification)') | (df_s['Rule'] == 'Full Day Failures (immediate WL)')]

    df_s_4 = df_s[(df_s['Rule'] == 'Failure at same location (notification)') | (df_s['Rule'] == 'Failure at same location (immediate WL)')]
    df_s_4 = df_s_4.append(df_s[(df_s['Rule'] == 'No movement (notification)') | (df_s['Rule'] == 'No movement (immediate WL)')])

    return df_1,df_2,df_3,df_4,df_5,df_s_1,df_s_2,df_s_3,df_s_4

In [10]:
def update_status(rd,rule,time):

    ws = gs.worksheet_by_title('locked_actions')
    numRows = len(ws.get_all_values(returnas='matrix'))
    ws.add_rows(1)
    
    rule_cell = "A" + str((numRows+1))
    time_cell = "B" + str((numRows+1))
    status_cell = "C" + str((numRows+1))
    
    ws.cell(rule_cell).value = rule
    ws.cell(time_cell).value = time
        
    if (rd == "Failed Run" or len(rd) == 0):
        ws.cell(status_cell).value = rule = 'Query Timeout'
       
    else:
        ws.cell(status_cell).value = 'Success'

In [11]:
# check existence of notification tag, and tag / note only for new courier partners
def tag_notes_courier(rd):

    if len(rd) <> 0:
        for ctr in range(0,len(rd)):
            # check for notification tag
            t1 = tchannel.thrift(populous_service.UserService.getUserTag(rd[ctr]['driver_uuid'], rd[ctr]['tag']))
            
            try:
                t2=t1.result()
                rd[ctr]['Status'] = "Not Actioned"
            except:
                rd[ctr]['Status'] = "Actioned"
                tchannel.thrift(populous_service.UserService.createUserTag(rd[ctr]['driver_uuid'], rd[ctr]['tag']))
                tchannel.thrift(populous_service.UserService.createUserNote(rd[ctr]['driver_uuid'], rd[ctr]['note']))

                if rd[ctr]['tag'].find('notification') <> -1:
                    tchannel.thrift(populous_service.UserService.createUserTag(rd[ctr]['driver_uuid'],notification_tracking_tag))

                if rd[ctr]['tag'].find('immediatewl') <> -1:
                    tchannel.thrift(populous_service.UserService.createUserTag(rd[ctr]['driver_uuid'],waitlist_tracking_tag))
    # return the new data frame
    return rd

In [12]:
run_fast_track = is_fast_track()
if run_fast_track == 'Yes':
    df, df_s = load_city_settings()
    df_1,df_2,df_3,df_4,df_5,df_s_1,df_s_2,df_s_3,df_s_4 = rule_engine(df,df_s)
    
    q_1 = 'EpowqPGsn' # fraud, failed, nro
    q_2 = '' # cancel at restaurant
    q_3 = '' # all day failures
    q_4 = '' # no movement, same location fails
    q_5 = '' # missing items 
    q_6 = 'e5kwFFUNB' # cool off logic
    
    d_1 = 'presto'
    d_2 = 'presto'
    d_3 = 'presto'
    d_4 = 'presto'
    d_5 = 'presto'
    d_6 = 'presto'
    
    from datetime import datetime, timedelta
    datestr = str(datetime.now() - timedelta(days=56))[0:10]
    
    time_now = datetime.now()
    
    if time_now.hour < 17:
        run_cycle = 1
    elif time_now.hour >= 18:
        run_cycle = 2
      
    rd_1 = qb_engine(df_1,q_1,d_1,datestr)
    update_status(rd_1,'Rules # Fraud, Failed, NRO',str(datetime.now())[0:19])

#     rd_2 = qb_engine(df_2,q_2,d_2,datestr)
#     update_status(rd_2,'Rules # Cancelation at Rest',str(datetime.now())[0:19])
    
#     rd_3 = qb_engine(df_3,q_3,d_3,datestr)
#     update_status(rd_3,'Rules # All day failures',str(datetime.now())[0:19])
    
    
#     rd_4 = qb_engine(df_4,q_4,d_4,datestr)
#     update_status(rd_4,'Rules # No movement, Same location fail',str(datetime.now())[0:19])
        
#     rd_5 = qb_engine(df_5,q_5,d_5,datestr)
#     update_status(rd_5,'Rules # missing items',str(datetime.now())[0:19])
    
    rd_6 = co_engine(df,q_6,d_6,datestr)
    update_status(rd_6,'Cool off logic',str(datetime.now())[0:19])

    
    # notification and tracking tags 
    notification_tracking_tag = 'latam_fraud_fasttrack_soft_tag'
    waitlist_tracking_tag = 'latam_fraud_fasttrack_wl_queue'

    rd_f = []
    
    if rd_1 <> "Failed Run": 
        rd_f += rd_1
#     if rd_2 <> "Failed Run": 
#         rd_f += rd_2
#     if rd_3 <> "Failed Run": 
#         rd_f += rd_3
#     if rd_4 <> "Failed Run": 
#         rd_f += rd_4
#     if rd_5 <> "Failed Run": 
#         rd_f += rd_5

    #tag and add notes
    rd_f = tag_notes_courier(rd_f)
    
    #add cooloff output to rd_f after tagging and actioning
    if rd_6 <> "Failed Run":
        if len(rd_6) <> 0:
            for ctr in range(0,len(rd_6)):
                tchannel.thrift(populous_service.UserService.createUserTag(rd_6[ctr]['driver_uuid'], rd_6[ctr]['tag']))
                rd_6[ctr]['Status'] = "Actioned"
        rd_f += rd_6
    
        
    # if output from all queries is null 
    if len(rd_f) <> 0:
        df_f = pd.DataFrame(rd_f)
        df_f['timestamp'] = str(datetime.now())[0:19]
        courier_log(df_f,'1lWlxD5t49uPEOj_1QOb8W2HdqqRrC_JA-qS8QzBB9gY')
        
    else:
        df_f = pd.DataFrame()
        df_f['Status'] = ''
        df_f['city_id'] = ''
        df_f['driver_uuid'] = ''
        df_f['note'] = ''
        df_f['tag'] = ''
        df_f['timestamp'] = ''
        df_f = df_f.append({'Status': 'Null query output', 'city_id': 'Null query output', 'driver_uuid': 'Null query output', 'note': 'Null query output', 'tag': 'Null query output', 'timestamp': str(datetime.now())[0:19]}, ignore_index=True)
        courier_log(df_f,'1lWlxD5t49uPEOj_1QOb8W2HdqqRrC_JA-qS8QzBB9gY')
    
    # End of execution   

10/02/2019 04:13:36 PM [93m Fetching metadata for Report EpowqPGsn [0m
2019-10-02 16:13:36,371 querybuilder_client INFO [93m Fetching metadata for Report EpowqPGsn [0m
10/02/2019 04:13:36 PM [92m Loaded object metadata. [0m
2019-10-02 16:13:36,447 querybuilder_client INFO [92m Loaded object metadata. [0m
10/02/2019 04:13:36 PM [93m Templating query for report EpowqPGsn [0m
2019-10-02 16:13:36,451 querybuilder_client INFO [93m Templating query for report EpowqPGsn [0m
10/02/2019 04:13:36 PM [92m Templated query successfully. [0m
2019-10-02 16:13:36,520 querybuilder_client INFO [92m Templated query successfully. [0m
10/02/2019 04:13:36 PM [93m [Polling] 0a7ffead-31b8-4b3b-8fca-1f9b571edcec [0m
10/02/2019 04:13:36 PM [93m [Status] pending validation [0m
10/02/2019 04:13:37 PM [93m [Status] in validation [0m
10/02/2019 04:13:38 PM [93m [Status] pending execution [0m
10/02/2019 04:13:39 PM [93m [Status] in execution [0m
10/02/2019 04:13:39 PM [93m [External ID] f9

In [13]:
courier_log(df_f,'1lWlxD5t49uPEOj_1QOb8W2HdqqRrC_JA-qS8QzBB9gY')