In [133]:
from utilities.google_client import google_sheet_API
from utilities.athena_client import athena_API
from utilities.slack_client import slack_API
import multiprocessing as mp
import time

In [2]:
# Initialise the 2 APIs 
gsheetAPI = google_sheet_API()
athenaAPI = athena_API()
slackAPI = slack_API()

In [50]:
# Retrieve GSheet
service = gsheetAPI.retrieve_gservice()
gsheet_df = gsheetAPI.get_google_sheet(service=service)

In [82]:
# Query
with open('main_query.sql', 'r') as f:
    sql_query = f.read()pipenv 

In [92]:
# Below is the inside of the for loop
# Generate the clauses 
for identifier_id in ['account_id', 'domain_id', 'site_id']:
    if row_dict[identifier_id] != '':
        row_dict[f'{identifier_id}_condition'] = f'and {identifier_id} = ' + '\'' + row_dict[identifier_id] + '\''
    else:
        row_dict[f'{identifier_id}_condition'] = ''

In [93]:
row_dict['threshold_one'] = '\'' + row_dict['threshold_one | 0-7'] + '\''
row_dict['threshold_two'] = '\'' + row_dict['threshold_two | 8-15'] + '\''
row_dict['threshold_three'] = '\'' + row_dict['threshold_three | 16-23'] + '\''

In [106]:
# Get the query
results = athenaAPI.get_pandas_df(sql_query.format(**row_dict)).to_dict('records')[0]

In [123]:
# Helper function to generate the looker url string
def generate_looker_url(row_dict):
    url_match_formatted = row_dict.get('ilike_url').replace('/', '%2F')
    account_id = row_dict.get('account_id')
    site_id = row_dict.get('site_id')
    domain_id = row_dict.get('domain_id')
    return f'https://analytics.distilnetworks.com/dashboards/618?access_time=168%20hours&account_id={account_id}&site_id={site_id}&domain_id={domain_id}&url_match=%25{url_match_formatted}%25'

In [124]:
def compose_slack_alert(row_idx, row_dict, results):
    domain = row_dict.get('domain')
    ilike_url = row_dict.get('ilike_url')
    requests = results.get('requests')
    threshold = results.get('threshold')
    multiplier = row_dict.get('multiplier')
    threshold_bucket = results.get('threshold_bucket')
    looker_url = generate_looker_url(row_dict)
    
    return f'''ALERT! :warning: 
    Domain: _{domain}_ | Url: _{ilike_url}_ | Time: _{time.asctime()}_
    Request count exceded the threshold:
    ```- Request count: {requests}
    - Threshold: {threshold}
    - Threshold w multiplier: {int(threshold)*float(multiplier)}
    - Threshold bucket: {threshold_bucket}
    - Percentage increase: {round(int(requests)/int(threshold)*100-100, 2)}%```
    <{gsheetAPI.gsheet_link}|Attack monitor sheet> row: {row_idx}
    <{looker_url}|Looker Dashboard>'''

In [131]:
def process_row(row_idx, row_dict):
    # Check logic and send to slack:
    if results.get('requests')>int(results.get('threshold'))*float(row_dict.get('multiplier')):
        try:
            message = compose_slack_alert(row_idx, row_dict, results)
            slackAPI.send_message(message)
            client_message = 'Successful'
        except Exception as e:
            slackAPI.send_message(f"Error for domain {domain}, row {row_idx}, query returned error:\n ```{str(e)}```")
            client_message = str(e)
    # Update Google sheet
    cell_address = f'O{row_idx+2}'
    gsheetAPI.update_sheet(service, cell_address, [time.asctime(), client_message])

In [132]:
# --------- #
# MAIN LOOP #
# --------- #
if __name__ == '__main__':
    while True:
        # Retrieve GSheet
        service = gsheetAPI.retrieve_gservice()
        gsheet_df = gsheetAPI.get_google_sheet(service=service)
        write_log(f'Data from Google Sheet retrieved at {time.asctime()}')

        # Prepare a list of dicts
        row_list = [(row_idx, row_dict) for row_idx, row_dict in gsheet_df.iterrows()]

        # Multiprocessing to run the queries simultaneously, it has to stay in the main function
        # The argument is the full dictionary, I need most of the arguments
        write_log(f'Beginning multiprocessing at {time.asctime()}, running {len(row_list) * 2} queries on {mp.cpu_count()} threads')
        time_start = time.time()
        with mp.Pool(mp.cpu_count()) as pool:
            results = [pool.apply(process_row, args=row) for row in row_list]

        # Print how long it took for a full cycle
        exec_time = "{:.2f}".format(time.time() - time_start)
        write_log(f'Multiprocessing completed in {exec_time} seconds')

Data from Google Sheet retrieved at Thu Apr 30 13:57:10 2020
Beginning multiprocessing at Thu Apr 30 13:57:10 2020, running 10 queries on 16 threads
0 status                                                   active
name                                 first test attack alerting
domain                                   mercury.worldremit.com
threshold_one | 0-7                                         300
threshold_two | 8-15                                       1550
threshold_three | 16-23                                    1500
multiplier                                                  1.3
account_id                 2e8d6f13-c651-46ca-bfe5-2293e4586ede
domain_id                                                      
site_id                    32261b05-3a41-45df-ae4c-199945da4c96
ilike_url                                           /auth/login
and_condition                                AND action is NULL
note                                test to check if this works
owner            

Process ForkPoolWorker-65:
Process ForkPoolWorker-66:
Process ForkPoolWorker-67:
Process ForkPoolWorker-72:
Process ForkPoolWorker-70:
Process ForkPoolWorker-71:
Process ForkPoolWorker-69:
Process ForkPoolWorker-77:
Process ForkPoolWorker-74:
Process ForkPoolWorker-78:
Process ForkPoolWorker-73:
Process ForkPoolWorker-68:
Process ForkPoolWorker-75:
Process ForkPoolWorker-79:
Process ForkPoolWorker-76:
Process ForkPoolWorker-80:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multip

KeyboardInterrupt: 

In [129]:
# Helper function to write logs and console at the same time
def write_log(line):
    print(line)
    with open("log.txt", "a+") as logs:
        logs.write(f"{line}\r\n")

In [150]:
time.asctime(datetime.utcnow().timetuple())

'Fri May  1 12:11:52 2020'

In [149]:
from datetime import datetime
datetime.utcnow().timetuple()

time.struct_time(tm_year=2020, tm_mon=5, tm_mday=1, tm_hour=12, tm_min=11, tm_sec=47, tm_wday=4, tm_yday=122, tm_isdst=-1)

In [136]:
time.gmtime()

time.struct_time(tm_year=2020, tm_mon=5, tm_mday=1, tm_hour=12, tm_min=8, tm_sec=56, tm_wday=4, tm_yday=122, tm_isdst=0)

In [146]:
time.asctime(time.gmtime(0))

TypeError: Tuple or struct_time argument required

In [151]:
service = gsheetAPI.retrieve_gservice()

In [None]:
service.