### <center> The goal of this notebook is to find any urls over the past week that have been accessed within the fedex network that could be harmful or otherwise inappropriate.
    
- Step <a href='#imports'>#1</a>: Install and Import Libraries
- Step <a href='#gcp'>#2</a>: GCP Configurations
- Step <a href='#query'>#3</a>: Query Raw URLs
- Step <a href='#preprocess'>#4</a>: Preprocessing of the Raw URLs
- Step <a href='#early_rdap'>#5</a>: Start RDAP early
- Step <a href='#batch_predictions'>#6</a>: Run Batch Prediction on Processed URLs
- Step <a href='#user_count'>#7</a>: User Count Scoring
- Step <a href='#combine'>#8:</a> Combine Outputs from Jaccard and User Count Scoring
- Step <a href='#RDAP'>#9:</a> RDAP Scoring
- Step <a href='#total'>#10:</a> Find Total Score
- Step <a href='#pipeline'>#11:</a> KubeFlow Pipelining
    - Step <a href='#component'>#11.1:</a> Component Initialization
    - Step <a href='#define'>#11.2:</a> Defining the pipeline
    - Step <a href='#compile'>#11.3:</a> Compile The Pipeline
    - Step <a href='#submit'>#11.4:</a> Submitting the pipeline

## <a id='imports'></a>
## <center>Step 1: Install and Import Libraries

In [None]:
%%time

# Install all dependencies from requirements.txt file.
"""
       --user: install python packages in the user directory instead of the syste,-wide directory.
               Useful if you don't require admin priviledges in your notebook.
-q or --quiet: Make command line tools (such as pip) produce less output during execution,
               meaning will not show non-essential or verbose information.
               Will still show errors or major warnings.
"""
%pip install --user --quiet -r requirements.txt
print('Imported requirements')

In [None]:
%%time

import os

import datetime

import argparse

from typing import NamedTuple

from tld import get_tld

import tld

import pandas as pd

import numpy

import crcmod

import termcolor

import regex

import pydot

import objsize

import orjson

from google.cloud import aiplatform

from google.cloud import bigquery

from google.cloud import storage

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.options.pipeline_options import SetupOptions

from apache_beam.options.pipeline_options import GoogleCloudOptions

from apache_beam.options.pipeline_options import StandardOptions

from apache_beam.options.pipeline_options import WorkerOptions

from apache_beam.dataframe.convert import to_dataframe

from apitools.base import *

from apache_beam.io.gcp.internal.clients.storage.storage_v1_client import *

from apache_beam.io.gcp.internal.clients.storage.storage_v1_messages import *

import kfp

from kfp import dsl

import kfp

from kfp.components import create_component_from_func

import pandas as pd

import time

import subprocess

import google.cloud.aiplatform as aip

import json

from kfp.v2 import dsl

from kfp.v2 import compiler

from kfp.v2.dsl import (component, Input, Model, Output, Dataset, 
                        Artifact, OutputPath, ClassificationMetrics, 
                        Metrics, InputPath)

from google.cloud import aiplatform_v1, bigquery

from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp

from google_cloud_pipeline_components.v1.wait_gcp_resources import WaitGcpResourcesOp

from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp

from kfp.v2.google.client import AIPlatformClient

from sklearn.preprocessing import MinMaxScaler

import google.cloud.aiplatform as aiplatform

import logging

## <a id='gcp'></a>
## <center>Step 2: GCP Configurations

In [None]:

# https://github.com/outsidenoxvodafone/vertex-ai-pipeline/blob/main/vertex-ai-pipeline.ipynb
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Set up variables
PROJECT_ID = 'fxs-gccr-sbd-dev-sandbox' # GCP Project ID
REGION = 'us-west1'                     # GCP Region to run pipelines

SERVICE_ACCOUNT='my-bigquery-sa@fxs-gccr-sbd-dev-sandbox.iam.gserviceaccount.com' #Service account 
PIPEINE_ROOT = '/home/jupyter/UMRF-MURLv2/' # location where pipeline's artifacts are stored'
BUCKET_NAME = 'suspicious_user_bucket/PipelineOutputs'    # Google Cloud Storage Bucket to store pipeline outputs
BIGQUERY_DATASET = 'umrf_malicious_urls'
BIGQUERY_TABLE = 'refined_url_features'

PIPELINE_NAME = 'test_6-9'
JOB_ID = f'Training-pipeline-{TIMESTAMP}'
ENABLE_CASHING = False

#Just sets it so that it won'we won't have to see DEBUG or INFO messages in our outputs
logging.getLogger('fsspec').setLevel(logging.WARNING)

# TEMPLATE_PATH = '... .json'

# Set the default GCP project
# vertex_ai.init(project=PROJECT_ID, location=REGION)



## <a id='query'></a>
# <center> Step 3: Query Raw URLs
## This is accomplished by a rudementary query to the cim table

In [None]:
def query_job() -> None:
    import datetime, logging, pandas as pd, sys, time, concurrent.futures, re
    from google.cloud import bigquery, storage
    import pandas


    def dataframe_upload(table, monday, df):
        client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
        query = f"DELETE FROM fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.{table} WHERE week_of = '{monday}'"
        client.query(query)
        df.to_gbq(destination_table=f'umrf_murl_v2_results.{table}',
                project_id='fxs-gccr-sbd-dev-sandbox', chunksize=1_000_000, if_exists='replace')

    def run_query(day, limit):
        # BigQuery Query Parameters
        my_client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")

        whitelist = ["fedex.com", "microsoft.com", "akamaihd.net", "googleapis.com", "amazonaws.com", "mcusercontent.com", "google.com", "oracle.com", "facebook.com", "amazon.com", "bing.com", "outlook.com", "office.com", "live.com", "en.wikipedia.org", "twitter.com", "imdb.com", "instagram.com", "fandom.com", "pinterest.com", "nytimes.com", "ebay.com", "espn.com", "etsy.com", "apple.com", "healthline.com", "homedepot.com", "mayoclinic.org", "cnn.com", "linkedin.com", "craigslist.org", "indeed.com", "nih.gov", "bestbuy.com", "lowes.com", "foxnews.com", "cdc.gov", "cvs.com", "irs.gov", "britannica.com", "clevelandclinic.org", "costco.com", "medicalnewstoday.com", "mail.yahoo.com", "forbes.com", "ca.gov", "cbssports.com", "merriam-webster.com", "macys.com", "allrecipes.com", "finance.yahoo.com", "nordstrom.com", "mapquest.com", "cnbc.com", "nike.com", "businessinsider.com", "kohls.com", "nypost.com", "mlb.com", "apartments.com", "investopedia.com", "expedia.com", "chase.com", "genius.com", "npr.org", "foodnetwork.com", "nba.com", "bankofamerica.com", "go.com", "accuweather.com", "hotels.com", "bbc.com", "cnet.com", "bleacherreport.com", "dictionary.com", "medlineplus.gov", "nbcnews.com", "goodhousekeeping.com", "basketball-reference.com", "kbb.com", "att.com", "dailymail.co.uk", "cbsnews.com", "aol.com", "hulu.com", "ikea.com", "listcrawler.eu", "newsweek.com", "ign.com", "ny.gov", "dickssportinggoods.com", "booking.com", "foursquare.com", "mcdonalds.com", "cosmopolitan.com", "nbcsports.com", "doordash.com", "cars.com", "goodreads.com", "carfax.com", "bloomberg.com", "hopkinsmedicine.org", "caranddriver.com", "adobe.com", "insider.com", "nerdwallet.com", "dominos.com", "youtube.com", "windowsupdate.com", "office.com"]
        whitelist = r"^.*%s" % "$|^.*".join(map(re.escape, whitelist))
        whitelist = whitelist + "$|^(\d{{1,3}}\.){{3}}\d{{1,3}}$" 

        my_query = f"""
            SELECT url, COUNT(DISTINCT user) AS user_count, STRING_AGG(DISTINCT user, ', ') as users
            FROM `fxs-entsvcs-eca-ipt-prod-1.fdx_eca_bq_refine_cim_web.refine_cim_web`
            WHERE _timestamp BETWEEN "{day} 00:00:00" AND "{day} 23:59:59"
            AND NOT REGEXP_CONTAINS(dest, r"{whitelist}")
            AND NOT http_method IN ('CONNECT', 'CERTVERIFY')
            AND url IS NOT NULL 
            AND url != 'unknown'
            AND action = 'allowed'
            GROUP BY url
            HAVING user_count < 50
            LIMIT {limit}
            """

                    
        # BigQuery Test Job Setup
        my_config = bigquery.QueryJobConfig()
        my_config.dry_run = True
        my_test = my_client.query(my_query, job_config=my_config)

        gigabytes = my_test.total_bytes_processed/(10**9)
        money = (gigabytes/1024)*5
        logging.info(f"This job will take ~{gigabytes:.2f} gigabytes or ~{money:.2f} dollars to run")

        if money > 25:
            logging.error(f'query for {day} is too expensive, skipping...')
            return None

        # Running the Query
        raw_predicted_urls = my_client.query(my_query).to_dataframe()
        logging.info('did query')

        return raw_predicted_urls
    
    logging.getLogger().setLevel(logging.INFO)

    # Multithreaded Queries
    todayDate = datetime.date.today()
    most_recent_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=0)
    logging.info(f'Most Recent Monday: {str(most_recent_monday)}')
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    logging.info(f'Previous Monday: {str(previous_monday)}')

    date_list = [most_recent_monday - datetime.timedelta(days=x+1) for x in range(7)]
    urls_w_users = pd.DataFrame(columns=['url', 'user_count'], index=[])
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []

        for day in date_list:
            logging.info(f"querying {day}'s urls")
            future = executor.submit(run_query, day, 1_500_000)
            futures.append(future)

        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if result is None:
                pass
            else:
                urls_w_users = pd.concat([urls_w_users, result])       

    logging.info("dropping duplicates.")
    urls_w_users.drop_duplicates(inplace=True)
    raw_df = urls_w_users.drop(columns=['user_count','users'])
    urls_w_users['users_list'] = urls_w_users['users'].str.split(", ")
    urls_w_users.drop(columns=['users'], inplace=True)

    logging.info("Sorting dataframes.")
    raw_df.sort_values(by='url', inplace=True)
    raw_df.reset_index(drop=True, inplace=True)
    raw_df['index'] = raw_df.index
    urls_w_users.sort_values(by='url', inplace=True)
    urls_w_users.reset_index(drop=True, inplace=True)
    urls_w_users['index'] = urls_w_users.index
    logging.info("dataframes finalized.")    


    raw_df['week_of'] = pd.to_datetime(previous_monday)
    urls_w_users['week_of'] = pd.to_datetime(previous_monday)
    
    dataframe_upload('TEST_query_raw',previous_monday,raw_df )
    urls_w_users['users_list'] = urls_w_users["users_list"].apply(lambda x: ','.join(map(str, x)))
    #return urls_w_users
    dataframe_upload('TEST_exploded_query_w_users', previous_monday, urls_w_users)

In [None]:
#query_job()

## <a id='preprocess'></a>
# <center> Step 4: Preprocessing of the Raw URLs

In [None]:
def megapreprocess_op() -> None :
    import datetime
    import concurrent.futures
    from google.cloud import bigquery, storage
    import logging
    import pandas as pd
    import re
    from sklearn.preprocessing import MinMaxScaler
    import sys
    import time
    from tld import get_tld
    from urllib.parse import urlparse
    import tldextract
    import string



    def dataframe_upload(table, monday, df):
        df.to_gbq(destination_table=f'umrf_murl_v2_results.{table}',
                project_id='fxs-gccr-sbd-dev-sandbox', chunksize=1_000_000, if_exists='replace')

 
    def megapreprocess(df):
    #####
    ##### Feature Engineering
    #####
        def get_domain(url):
            url_tld = tldextract.extract(url).suffix
            domain_name = tldextract.extract(url).domain
            full_domain = domain_name + '.' + url_tld
            return full_domain
        
        
        def abnormal_url(url):
            hostname = urlparse(url).hostname
            if hostname is None:
                return 0
            if hostname in url:
                return 1
            else:
                return 0
 

        def get_path(url):
            try:
                return urlparse(url).path
            except Exception as e:
                logging.error(f" EXCEPTION: {e}")
                logging.error(f"getting path function failed!\tURL: {url}")


        def letter_count(url):
            try:
                letters = 0
                for i in url:
                    if i.isalpha():
                        letters += 1
                return letters
            except Exception as e:
                logging.error(f" EXCEPTION: {e}")
                logging.error(f"Counting letters failed!\tURL: {url}")

        def count_subs(url):
            sub = tldextract.extract(url).subdomain
            if sub == "":  # if .split is run on the empty string it makes a list of length 1 with an empty element instead of an empty list
                return 0
            else:
                return len(sub.split("."))
        def get_port(url):
                    try:
                        return urlparse(url).port
                    except Exception as e:
                        logging.error(f" EXCEPTION: {e}")
                        logging.error(f"getting port function failed!\tURL: {url}")

        def port_sus(port):
                    if port is None:
                        return 0.5
                    elif port == 80 or port == 443:
                        return 1   
                    if 0 <= port <= 1023:
                        return 0.5
                    else: 
                        return 0

            


        
        try:
            logging.info("in the thread function")
            df['domain'] = df['url'].apply(lambda i: get_domain(i))
            logging.info("Got domains")
            df['path'] = df['url'].apply(lambda i: get_path(i))
            logging.info("Got paths")
            df['tld'] = df['url'].apply(lambda i: get_tld(i,fail_silently=True)).fillna('')
            logging.info("Got TLDs")

            last = time.time()
            ipv4 = r'(?:^(?:(?:(?:25[0-5])|(?:2[0-4]\d)|(?:\d|1\d{2}))\.){3}(?:(?:25[0-5])|(?:2[0-4]\d)|(?:\d|1\d{2})))'
            ipv6 = r'(?:^(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4})'
            any_ip = f"{ipv4}|{ipv6}"
            df['having_ip_address'] = df['url'].str.contains(any_ip)
            current = time.time()
            logging.info(f"{current-last} function run: ip address")
            logging.info("1/33")
            last = current
            
            # TODO: Can be improved upon by looking at multiple characterics, and the greater the number of abnormalities in different checks, we can normalize that score for the model.
            # THis currenty returns 1 for a lot of URLs.
            df['abnormal_url'] = df['url'].apply(abnormal_url)
            current = time.time()
            logging.info(f"{current-last} function run: abnormal")
            logging.info("2/33")
            last = current
            
            df['count_dot'] = df['url'].str.count(r'\.')
            current = time.time()
            logging.info(f"{current-last} function run: .")
            logging.info("3/33")
            last = current
 
            df['count_www'] = df['url'].str.count('www')
            current = time.time()
            logging.info(f"{current-last} function run: www")
            logging.info("4/33")
            last = current
 
            df['count_atrate'] = df['url'].str.count('@')
            current = time.time()
            logging.info(f"{current-last} function run: @")
            logging.info("5/33")
            last = current
 
            df['count_https'] = df['url'].str.count("https")
            current = time.time()
            logging.info(f"{current-last} function run: https")
            logging.info("6/33")
            last = current

            shorteners = ["bit.ly", "goo.gl", "shorte.st", "go2l.ink", "x.co", "ow.ly", "t.co", "tinyurl",
                "tr.im", "is.gd", "cli.gs", "yfrog.com", "migre.me", "ff.im", "tiny.cc", "url4.eu",
                "twit.ac", "su.pr", "twurl.nl", "snipurl.com", "short.to", "BudURL.com", "ping.fm",
                "post.ly", "Just.as", "bkite.com", "snipr.com", "fic.kr", "loopt.us", "doiop.com",
                "short.ie", "kl.am", "wp.me", "rubyurl.com", "om.ly", "to.ly", "bit.do", "lnkd.in",
                "db.tt", "qr.ae", "adf.ly", "bitly.com", "cur.lv", "tinyurl.com", "ity.im", "q.gs",
                "po.st", "bc.vc", "twitthis.com", "u.to", "j.mp", "buzurl.com", "cutt.us", "u.bb",
                "yourls.org", "prettylinkpro.com", "scrnch.me", "filoops.info", "vzturl.com", "qr.net",
                "1url.com", "tweez.me", "v.gd", "link.zip.net"]
            pattern = r"^(%s)" % "|".join(map(re.escape, shorteners))
            df['short_url'] = df['domain'].str.contains(pattern)
            current = time.time()
            logging.info(f"{current-last} function run: shortening_service list")
            logging.info("7/33")
            last = current

            df['count_http'] = df['url'].str.count("http")
            current = time.time()
            logging.info(f"{current-last} function run: http")
            logging.info("8/33")
            last = current
 
            df['count_ques'] = df['url'].str.count(r"\?")
            current = time.time()
            logging.info(f"{current-last} function run: ?")
            logging.info("9/33")
            last = current
 
            df['count_per'] = df['url'].str.count(r"\%")
            current = time.time()
            logging.info(f"{current-last} function run: %")
            logging.info("10/33")
            last = current
 
            df['count_hyphen'] = df['url'].str.count("-")
            current = time.time()
            logging.info(f"{current-last} function run: -")
            logging.info("11/33")
            last = current
 
            df['count_equal'] = df['url'].str.count("=")
            current = time.time()
            logging.info(f"{current-last} function run: =")
            logging.info("12/33")
            last = current
 
            df['url_length'] = df['url'].str.len()
            current = time.time()
            logging.info(f"{current-last} function run: url length")
            logging.info("13/33")
            last = current
 
            df['hostname_length'] = df['domain'].str.len()
            current = time.time()
            logging.info(f"{current-last} function run: domain length")
            logging.info("14/33")
            last = current
 
 
            suspicious_words = ["PayPal", "login", "signin", "bank", "account", "update", "free", "lucky", "service", "bonus", "ebayisapi", "webscr"]
            df['sus_url'] = df['url'].apply(lambda url: any(word in url for word in suspicious_words))
            current = time.time()
            logging.info(f"{current-last} function run: sus url with list")
            logging.info("15/33")
            last = current
 
            df['count_digits']= df['url'].str.count(r"\d")
            current = time.time()
            logging.info(f"{current-last} function run: digit count")
            logging.info("16/33")
            last = current
 
            df['count_letters']= df['url'].apply(lambda i: letter_count(i))
            current = time.time()
            logging.info(f"{current-last} function run: letter lambda")
            logging.info("17/33")
            last = current
 
            df['count_dir'] = df['path'].str.count('/')
            current = time.time()
            logging.info(f"{current-last} function run: /")
            logging.info("18/33")
            last = current
 
            df['count_embed_domain'] = df['path'].str.count("//")
            current = time.time()
            logging.info(f"{current-last} function run: embed")
            logging.info("19/33")
            last = current
 
            current = time.time()
            df['fd_length'] = df['path'].str.split('/').str[1].str.len().fillna(0)
            logging.info(f"{current-last} function run: fd length")
            logging.info("20/33")
            last = current
 
            df['tld_length'] = df['tld'].str.len()
            current = time.time()
            logging.info(f"{current-last} function run: tld length")
            logging.info("21/33")
            last = current

            df['double_slash'] = df['url'].str.count("//")
            current = time.time()
            logging.info(f"{current-last} function run: double slash")
            logging.info("22/33")
            last = current

            df['subdomain_count'] = df['url'].apply(count_subs)
            current = time.time()
            logging.info(f"{current-last} function run: subdomain counter")
            logging.info("23/33")
            last = current

            pattern = '[' + '!@#$%^&*()_+,-./:;<=>?@[\]^_`{|}~' + ']'
            df['special_chars'] = df['url'].str.count(pattern)
            current = time.time()
            logging.info(f"{current-last} function run: special characters")
            logging.info("24/33")
            last = current

            df['ratio_digits'] = df['count_digits'] / df['url_length']
            current = time.time()
            logging.info(f"{current-last} function run: ratio digits")
            logging.info("25/33")
            last = current


            suspicious_keywords = [ 
            'PayPal', 'login', 'signin', 'bank', 'account', 'update', 'free', 'lucky', 'service', 'bonus', 'ebayisapi', 'webscr', 'login', 'signin', 
            'password', 'secure', 'account', 'verification', 'validate', 'confirm', 'token', 'update', 'registry', 'payment', 'credit', 'transaction', 'admin', 'service', 
            'webmaster', 'helpdesk', 'paypal', 'ebay', 'amazon', 'bank', 'wellsfargo', 'chase', 'citi', 'boa', 'fedex', 'microsoft', '.exe', '.zip', '.rar', '.doc', '.xls', '.pdf', 'free', 
            'gift', 'promo', 'offer', 'download', '.dll', 'prize', 'reward', 'sweepstakes', 'lottery', 'winner', 'congratulations', 'script', 'stream', 'play', 'game', 'invoke', 'download', 
            'cdn', 'media', 'video', 'manga']

            df['sus_words'] = df['url'].apply(lambda url: any(word in url for word in suspicious_keywords))
            current = time.time()
            logging.info(f"{current-last} function run: sus url with list")
            logging.info("26/33")
            last = current

            risky_extensions = ['.exe', '.swf', '.php']
            pattern = r"(%s)" % "|".join(map(re.escape, risky_extensions))
            df['risky_ext'] = df['url'].str.contains(pattern)
            current = time.time()
            logging.info(f"{current-last} function run: risky extensions")
            logging. info("27/33")
            last = current

            suspicious_TLDs = [
            "top", "tk", "xyz", "tw", "loan", "ga", "ml", "cf", "gq", "club", "online",
            "site", "ltd", "work", "vip", "icu", "pw", "cc", "gdn", "men", "win",
            "space", "fun", "stream", "bid", "review", "trade", "host", "cloud", "date",
            "download", "party", "ink", "science", "racing", "xin", "accountant", "faith",
            "webcam", "xn--fiqs8s", "xn--fiqz9s", "xn--p1acf", "xn--6qq986b3xl", "cricket",
            "tel", "xn--55qx5d", "page", "support", "co.zw", "xn--io0a7i", "gold",
            "xn--80adxhks", "casa", "xn--czru2d", "dev", "business", "xn--3ds443g",
            "gallery", "in.net", "zw", "bd", "ke", "am", "sbs", "quest", "cd", "cyou",
            "rest", "help", "ws", "tokyo", "cam", "cm", "uno", "email", "info", "su",
            "best", "ms", "country", "jetzt", "kim", "mom", "wang", "ninja", "zip",
            "realtor", "christmas", "pro", "sx", "link", "biz", "yokohama", "ooo",
            "ryukyu", "mw", "ci", "bar", "surf", "cn", "fit"
            ]
            pattern = r"^(%s)" % "|".join(map(re.escape, suspicious_TLDs))
            df['sus_tld'] = df['tld'].str.contains(pattern)
            current = time.time()
            logging.info(f"{current-last} function run: suspicious TLDs")
            logging. info("28/33")
            last = current

            allowed_chars = string.ascii_letters + string.digits + string.punctuation
            translation_table = str.maketrans("", "", allowed_chars)
            df['nonstandard_chars'] = df['url'].str.translate(translation_table) != ""
            current = time.time()
            logging.info(f"{current-last} function run: non-standard characters")
            logging.info("29/33")
            last = current

            df['port'] = df['url'].apply(get_port)
            current = time.time()
            logging.info(f"{current-last} function run: getting port")
            logging.info("30/33")
            last = current

            df['has_port'] = ~df['port'].isna()
            current = time.time()
            logging.info(f"{current-last} function run: has port binary")
            logging.info("31/33")
            last = current

            df['sus_port'] = df['port'].apply(port_sus)
            current = time.time()
            logging.info(f"{current-last} function run: suspicious ports")
            logging.info("32/33")
            last = current



            suspicious_patterns = ['DROP', 'INSERT', 'DELETE']  # expand as needed
            pattern = r"(%s)" % "|".join(map(re.escape, suspicious_patterns))
            df['sus_query'] = df['url'].str.contains(pattern)
            current = time.time()
            logging.info(f"{current-last} function run: malformed structures")
            logging.info("33/33")
            last = current

            adult_sus_words = [
            'camgirl', 'porn', 'xxx',  'pornhub', 'xvideo', 'xhamster', 'nsfw', 'only','feet', 'kink', 'nude', 'leaked', 'boob', 'boobs', 'lesbian', 'blacked', 
            'leaks', 'fap', 'voyuer'
            ]
            pattern = r"(%s)" % "|".join(map(re.escape, adult_sus_words))
            df['adult_words'] = df['url'].str.contains(pattern)



            scaler = MinMaxScaler()
            re_add_columns = df[['url', 'index', 'path', 'domain','week_of','tld']] #these are the columns we need to add back in after normalization
            df = df.drop(columns=['url', 'tld', 'index', 'path', 'domain', 'week_of'])  # these are the columns we remove
            # tld does not get added back in because its only needed during preprocessing
            logging.info('normalizing')
            df = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
            logging.info('normalizing done. Readding columns.')
            re_add_columns = re_add_columns.reset_index(drop=True)
            df.index = re_add_columns.index
            df = pd.concat([df, re_add_columns], axis=1)
            return df
        except Exception as e:
            logging.error(f" EXCEPTION: {e}")
            logging.error("preprocessing function errored. adding df to list of failures.")
            failed_dfs.append(df)
            return None

 
    #start of component
    start = time.time()
    logging.getLogger().setLevel(logging.INFO)
    now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

    client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
    todayDate = datetime.date.today()
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    print(previous_monday)
    query = f'SELECT * FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_query_raw` WHERE week_of = "{previous_monday}"' 
    df = client.query(query).to_dataframe()

    #TESTING CODE,IGNORE, BUT KEEP IN FOR EASE OF ACCESS FOR TESTING
    # df = pd.read_csv('gs://suspicious_user_bucket/data/misc_csvs/raw_df_tester.csv')  
    # df.drop(columns=['user_count'],inplace=True)
    # df['index'] = df.index
    # df['week_of'] = pd.to_datetime(previous_monday)


    logging.info(f"got {len(df)} rows")
    per_df_rows = 750_000
    count_of_dfs = len(df) // per_df_rows
    list_of_dfs = []
    if len(df) % per_df_rows != 0:
        count_of_dfs += 1 # 3.5m gives 4, 3m gives 3
    for i in range(count_of_dfs):
        start_index = i * per_df_rows
        end_index = start_index + per_df_rows
        list_of_dfs.append(df.iloc[start_index: end_index])
    
    
    processed_dfs = []
    failed_dfs = []
    logging.info("its threading time")

    with concurrent.futures.ThreadPoolExecutor() as executor:
    # Mapping index and url to process_url function
        futures = []
        for df in list_of_dfs:
            future = executor.submit(megapreprocess, df)
            logging.info("submitted df")
            futures.append(future)
        for future in concurrent.futures.as_completed(futures):
            res = future.result()
            if res is not None:
                processed_dfs.append(future.result())
    logging.info("All Dataframes processed.")
    logging.info(f"{len(processed_dfs)} dfs finished successfully")
    if len(processed_dfs) == len(list_of_dfs):  # all dataframes succeeded
        logging.info("All dataframes succeeded")
        df = pd.concat(processed_dfs)
    else:
        logging.error("Some dataframes failed. Exiting...")
        sys.exit(1)
        
    df = df.astype('str')    

    phish_cols = ["abnormal_url",    "count_atrate",    "count_digits",    "count_dir",    "count_dot",    "count_embed_domain",    "count_equal",    "count_http",    "count_https",    "count_hyphen",    "count_letters",    "count_per",    "count_ques",    "count_www",    "fd_length",    "having_ip_address",    "hostname_length",    "index",   "short_url",    "sus_url",    "sus_words",    "tld_length",    "url",    "url_length"]
    refined_cols =    ["abnormal_url",    "adult_words",    "count_atrate",    "count_digits",    "count_dir",    "count_dot",    "count_embed_domain",    "count_equal",    "count_https",    "count_hyphen",    "count_letters",    "count_per",    "count_ques",    "count_www",    "domain",    "double_slash",    "fd_length",    "hostname_length",    "index",    "nonstandard_chars",    "path",    "ratio_digits",    "risky_ext",    "special_chars",    "subdomain_count",    "sus_port",    "sus_query",    "sus_tld",    "sus_words",    "tld",     "url",    "url_length"]

    refined_df = df[refined_cols]
    phish_df = df[phish_cols]

    dataframe_upload('refined_model_bq_input', previous_monday, refined_df)
    dataframe_upload('phish_model_bq_input', previous_monday, phish_df)


    end = time.time()
    timer = f"total time to run: {end-start}"
    logging.info(timer)

In [None]:
#megapreprocess_op()

## <a id='early_rdap'></a>
# <center> Step 5: Start RDAP early

In [None]:
def early_RDAP() -> None:
    import ast
    import concurrent.futures
    import datetime
    from google.cloud import aiplatform_v1 as aiplatform, bigquery, storage
    import logging
    import os
    import pandas as pd
    import requests
    import threading
    import time
    from urllib.parse import urlparse
    import tldextract


    def  batch_pred_checker():
            def check_batch_preds(response):
                i = 0
                checker = 0

                for job in response:
                    print(f'Job status: {job.state.name}')
                    if i == 2:
                        logging.info('not done yet, waiting 1 minute')
                        return False
                    if job.state.name == 'JOB_STATE_RUNNING':
                        checker = checker + 1
                    if checker == 2:
                        return True
                    i = i+1


            client = aiplatform.JobServiceClient(client_options={"api_endpoint": f"{location}-aiplatform.googleapis.com"})
            parent = f"projects/{project_id}/locations/{location}"
            
            #checking if Batch Predictions have  started, if they have, move on to checking if they have ended
            batch_preds_started = False
            while (not batch_preds_started) and not stop_event.is_set():
                response = client.list_batch_prediction_jobs(parent=parent)
                
                if check_batch_preds(response):
                    batch_preds_started = True
                else:
                    end = time.time()
                    if (end - start) >= 3600:
                        print(f"batch preds are bugged, or taking longer than an hour, ending program")
                        stop_event.set()
                        break

                    time.sleep(60)


            #Just check if it's in any state that isn't Running, if neither batch preds is running, stop.
            batch_preds_ended = False
            while(not batch_preds_ended) and not stop_event.is_set():
                response = client.list_batch_prediction_jobs(parent=parent)
                
                if not check_batch_preds(response):
                    batch_preds_ended = True
                    stop_event.set()
                else:
                    end = time.time()
                    if (end - start) >= 3600:
                        print(f"batch preds are bugged, or taking longer than an hour, ending program")
                        stop_event.set()
                        break

                    time.sleep(60)

            stop_event.set()
            #Return true if both have stopped
            return True  

        
    def read_dict_from_gcs(blob):
        # Download the file as a string
        file_content = blob.download_as_text()

        # Parse the string into a dictionary using ast
        dictionary = ast.literal_eval(file_content)
        return dictionary
    
    
    def upload_dict_to_gcs(dictionary, bucket_name, destination_file_name):
        # Convert the dictionary to a string
        dictionary_string = str(dictionary)

        # Instantiates a client
        client = storage.Client()

        # Retrieves the bucket
        bucket = client.get_bucket(bucket_name)

        # Uploads the dictionary string to GCS as a .txt file
        blob = bucket.blob(destination_file_name)
        blob.upload_from_string(dictionary_string, content_type='text/plain')

        print(f"Dictionary uploaded to GCS bucket {bucket_name} as {destination_file_name}.")


    def get_basic_rdap(attempts):
        rdap_bootstrap_url = "https://data.iana.org/rdap/dns.json"
        
        # TODO: No proxy's are being exported, so no proxy is ever used.
        # Get proxy settings from environment variables (if there are any)
        proxies = {
            'http': os.environ.get('http_proxy'),
            'https': os.environ.get('https_proxy')
        }
        for i in range(attempts):
            # Fetch RDAP bootstrap data using proxy settings and 10 seconds timeout
            try:
                bootstrap_data = requests.get(rdap_bootstrap_url, proxies=proxies, timeout=10).json()
                logging.info(f'RDAP Successful {i}')
                return bootstrap_data
            except requests.exceptions.Timeout:
                logging.warning("Timeout error fetching RDAP bootstrap data.")
            except requests.exceptions.TooManyRedirects:
                logging.warning("Too many redirects when fetching RDAP bootstrap data.")
            except requests.exceptions.RequestException as e:
                logging.warning(f"Error fetching RDAP bootstrap data: {e}")
            except ValueError:
                logging.warning("Invalid JSON received from RDAP bootstrap service.")
        return None
    

    def get_rdap_url(domain, bootstrap_data):
        # Iterate through each entry in the 'services' list in the JSON response data
        for entry in bootstrap_data['services']:
            # Iterate through each domain pattern in the first element of the entry
            for domain_pattern in entry[0]:
                # If 'domain' ends with the domain pattern
                if domain.endswith(domain_pattern):
                    # Return the first RDAP server listed in the second element of the entry
                    # Can be queried for info about the domain
                    return entry[1][0] # return the first RDAP server URL for this TLD
        # Return None if no matching RDAP server is found
        return None
    
    
    def domain_getter(url):
        url_tld = tldextract.extract(url).suffix
        domain_name = tldextract.extract(url).domain
        full_domain = domain_name + '.' + url_tld
        return full_domain
    

    def domain_func(domain, bootstrap_data):
        nonlocal already_cached
        nonlocal failures
        #starting thread
        while not stop_event.is_set():
                    
            if domain in domain_cache:
                print('how tf u in here bruh')
                return 
            
            # If the domain starts with 'www.', remove the prefix
            if domain.startswith('www.'):
                domain = domain[4:]
            # Get the RDAP server URL for the domain
            rdap_url = get_rdap_url(domain, bootstrap_data)
            
            if not rdap_url:
                return 
            # Construct the RDAP info URL
            if rdap_url[-1] == "/":
                rdap_info_url = f"{rdap_url}domain/{domain}"
            else:
                rdap_info_url = f"{rdap_url}/domain/{domain}"
            try:
                # Send a GET request to the RDAP info URL with a 10 seconds timeout
                with rdap_semaphore:
                    nonlocal request_counter
                    request_counter += 1
                    if request_counter % 1000 == 0:
                        logging.info(f"{request_counter} done {time.time()}")
                    response = requests.get(rdap_info_url, timeout=10)
                # If the request is successful, return the JSON response data
                if response.status_code == 200:
                    response = response.json()
                else:
                    logging.error(f"Error fetching RDAP info for '{domain}': HTTP {response.status_code}. Reason: {response.reason}")
                    failures += 1
                    logging.info(f'Failures: {failures}')
                    return 
            except requests.exceptions.RequestException as e:
                logging.error(f"Error fetching RDAP info for '{domain}': {e}")
                return 
            
            
            if response is None or 'events' not in response:
                return 
            creation_date = None
            for event in response['events']:
                if event['eventAction'] == 'registration':
                    creation_date = datetime.strptime(event['eventDate'][0:10], '%Y-%m-%d')
                    domain_cache[domain] = creation_date.strftime('%Y-%m-%d')


    # Start of Component
    logging.getLogger().setLevel(logging.INFO)
    logging.info("Starting")
    start = time.time()
    stop_event = threading.Event()
    failures = 0
    already_cached = 0
    request_counter = 0
    project_id = "fxs-gccr-sbd-dev-sandbox"
    location = "us-west1"
    t1 = threading.Thread(target=batch_pred_checker)
    t1.start()
    logging.info("batch checker started")
    
    
    cache_bucket = "suspicious_user_bucket"
    cache_file_name = "rdap_cache/rdap_domains.txt"
    client = storage.Client()
    bucket = client.get_bucket(cache_bucket)
    blob = bucket.blob(cache_file_name)
    domain_cache = {}
    if blob.exists():
        domain_cache = read_dict_from_gcs(blob)
    logging.info("got cache")
    domains = {}
    todayDate = datetime.date.today()
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
    query = f'SELECT * FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_query_raw` WHERE week_of = "{previous_monday}"'
    df = client.query(query).to_dataframe()
    df_len = len(df)
    logging.info("getting domains")
    df['domain'] = df['url'].apply(domain_getter)
    logging.info("got domains")
    logging.info(len(domains))
    domains = set(df['domain'])
    domains = domains - set(domain_cache.keys())
    bootstrap_data = get_basic_rdap(attempts=10)
    if bootstrap_data is None:
        logging.critical("PLEASE HELP RDAP FAILED 10 TIMES IN A ROW THIS IS REALLY REALLY BAD")
        return
    rdap_semaphore = threading.Semaphore(4)
    logging.info("Starting threads")

    with concurrent.futures.ThreadPoolExecutor() as executor:
        for domain in domains:
            if stop_event.is_set():  # If batch predictions are done, stop creating tasks
                break
            executor.submit(domain_func, domain, bootstrap_data)

    # Call the function to upload the dictionary to GCS
    upload_dict_to_gcs(domain_cache, cache_bucket, cache_file_name)
    stop_event.set()
    end = time.time()
    timer = f"total time to run: {end-start}"
    logging.info(timer)
    logging.info(f"Requests sent: {request_counter}")
    logging.info(f'Failures: {failures}')
    logging.info(f'Already cached: {already_cached}')
    logging.info(f'DF length: {df_len}')
    logging.info(f'Failure rate: {failures/df_len}')

In [None]:
#early_RDAP()

## <a id='batch_predictions'></a>
# <center> Step #6: Run Batch Prediction on Processed URLs

The cell below does a lot

here's a breakdown

first we get the models for both phishpoint and murl

next we set up batch jobs to run for both (this is where the predictions of what is bad or good is done)

then we get table ids from the output of the batch jobs (this is where the data got sent after the predictions)

then we take the score from one of the data frames and add it to the dataframe made by the other score table

that data frame with both scores is the end result

In [None]:
not_in_refined = ['special_chars','nonstandard_chars','risky_ext','ratio_digits','double_slash','sus_query','sus_tld','adult_words','sus_port','subdomain_count']


not_in_phish = ['sus_words']


In [None]:
my_query =f"""
                        -- Create the phish table using the DistinctURLs temporary table
                        CREATE OR REPLACE TABLE `fxs-gccr-sbd-dev-sandbox.PHISH_FEATURE_ATTR.current_feature_attribute_phish` AS
SELECT d.url, 
       p.url AS old_url,
       p.having_ip_address AS having_ip_address,
       p.abnormal_url AS abnormal_url,
       p.count_dot AS count_dot,
       p.count_www AS count_www,
       p.count_atrate AS count_atrate,
       p.count_https AS count_https,
       p.short_url AS short_url,
       p.count_http AS count_http,
       p.count_ques AS count_ques,
       p.count_per AS count_per,
       p.count_hyphen AS count_hyphen,
       p.count_equal AS count_equal,
       p.url_length AS url_length,
       p.hostname_length AS hostname_length,
       p.sus_url AS sus_url,
       p.count_digits AS count_digits,
       p.count_letters AS count_letters,
       p.count_dir AS count_dir,
       p.count_embed_domain AS count_embed_domain,
       p.fd_length AS fd_length,
       p.tld_length AS tld_length,
       p.index AS index

                            FROM DistinctURLs d 
                            JOIN `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.phish_model_bq_input` p
                            ON d.url = p.url;

                        -- Create the refined table using the DistinctURLs temporary table
                        CREATE OR REPLACE TABLE `fxs-gccr-sbd-dev-sandbox.REFINED_FEATURE_ATTR.current_feature_attribute_refined` AS
SELECT d.url, 
       p.abnormal_url AS abnormal_url,
       p.count_dot AS count_dot,
       p.count_www AS count_www,
       p.count_atrate AS count_atrate,
       p.count_https AS count_https,
       p.count_ques AS count_ques,
       p.count_per AS count_per,
       p.count_hyphen AS count_hyphen,
       p.count_equal AS count_equal,
       p.url_length AS url_length,
       p.hostname_length AS hostname_length,
       p.sus_words AS sus_words,
       p.count_digits AS count_digits,
       p.count_letters AS count_letters,
       p.count_dir AS count_dir,
       p.count_embed_domain AS count_embed_domain,
       p.fd_length AS fd_length,
       p.special_chars AS special_chars,
       p.nonstandard_chars AS nonstandard_chars,
       p.risky_ext AS risky_ext,
       p.ratio_digits AS ratio_digits,
       p.double_slash AS double_slash,
       p.sus_query AS sus_query,
       p.sus_tld AS sus_tld,
       p.adult_words AS adult_words,
       p.sus_port AS sus_port,
       p.ip_address AS ip_address,
       p.subdomain_count AS subdomain_count,
       p.url AS old_url,
       p.index AS index,
       p.domain AS domain,
       p.path AS path,
       p.tld AS tld
                            FROM DistinctURLs d 
                            JOIN `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.refined_model_bq_input` p
                            ON d.url = p.url;

                        -- Drop the temporary table
                        DROP TABLE DistinctURLs;
                        END;
            """

In [None]:
jobs_data = {"bq://fxs-gccr-sbd-dev-sandbox.PHISH_FEATURE_ATTR":'phishpoint_model', "bq://fxs-gccr-sbd-dev-sandbox.REFINED_FEATURE_ATTR":'refined_model'}
bq_input_uris = ['bq://fxs-gccr-sbd-dev-sandbox.REFINED_FEATURE_ATTR.current_feature_attribute_refined','bq://fxs-gccr-sbd-dev-sandbox.PHISH_FEATURE_ATTR.current_feature_attribute_phish']

print(bq_input_uris[0])
print(bq_input_uris[1])

# for table in jobs_data:
#                 print(table)
#                 break
#                 model_name = jobs_data[table]
#                 bq_output_uri = table

#                 # Make batch prediction request to the model
#                 batch_prediction_job = {
#                     "display_name": "batch_prediction_job",
#                     "model": model_name,
#                     "input_config": {
#                         "instances_format": "bigquery",
#                         "bigquery_source": {"input_uri": bq_input_uris[i]}
#                     },
#                     "output_config": {
#                         "predictions_format": "bigquery",
#                         "bigquery_destination": {"output_uri": bq_output_uri},
#                     },
#                     "dedicated_resources": {
#                         "machine_spec": {
#                             "machine_type": "n1-standard-8"
#                         },
#                         "starting_replica_count": 20,
#                         "max_replica_count": 200
#                     },
#                     "explanation_spec": {  # This is the additional field for feature attributions
#                         "parameters": {
#                             "sampled_shapley_attribution": {  # Using Sampled Shapley feature attribution
#                                 "path_count": 10  # The number of feature permutations for Shapley values
#                             }
#                         }
#                     },
#                     "generate_explanation": True  # Setting this to true to enable explanations
#                 }
#                 logging.info(f"batch preds made for {table}")

In [None]:
def run_batch_preds() -> None:
    import datetime
    from google.cloud import aiplatform_v1, bigquery, storage
    import logging
    import pandas as pd
    import time
    
    
    def get_models(my_type): # my_type = [PhishPoint, Model_Finale]
        api_endpoint = f"{location}-aiplatform.googleapis.com"
        client = aiplatform_v1.services.model_service.ModelServiceClient(client_options={"api_endpoint": api_endpoint})
        model_name = None
        
        #Loops through all models, finds PhishPoint
        for model in client.list_models(parent=f"projects/{project_id}/locations/{location}"):
            if model.display_name == my_type:
                model_name = model.name


        if model_name is None:
            raise ValueError(f"Model with display name {my_type} not found.")
            
        return model_name
    
    def batch_jobs(bq_input_uri,jobs): # my_type = [PHISHING_BATCH, ACTUAL_SCALED_BATCH]
        # Set up authentication and project variables
        

        # Make batch prediction request to the model
        api_endpoint = f"{location}-aiplatform.googleapis.com"
        client = aiplatform_v1.services.job_service.JobServiceClient(client_options={"api_endpoint": api_endpoint})

        responses = []
        for job in jobs:
            model_name = jobs[job]
            
            if job =='PHISHING_BATCH':
                bq_output_uri = f'bq://fxs-gccr-sbd-dev-sandbox.PHISHING_BATCH'
                bq_input_uri =  'bq://fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.phish_model_bq_input'
            else:
                bq_output_uri = f'bq://fxs-gccr-sbd-dev-sandbox.REFINED_BATCH'
                bq_input_uri =  'bq://fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.refined_model_bq_input'

            batch_prediction_job = {
                "display_name": "batch_prediction_job",
                "model": model_name,
                "input_config": {
                    "instances_format": "bigquery",
                    "bigquery_source": {"input_uri": bq_input_uri}
                },
                "output_config": {
                    "predictions_format": "bigquery",
                    "bigquery_destination": {"output_uri": bq_output_uri},
                },
                "dedicated_resources": {
                    "machine_spec": {
                        "machine_type": "n1-standard-8"
                    },
                    "starting_replica_count": 20,
                    "max_replica_count": 200
                }
            }
            
            response = client.create_batch_prediction_job(
                parent=f"projects/{project_id}/locations/{location}",
                batch_prediction_job=batch_prediction_job
            )
            
            responses.append(response)
            
        
        failed = False
        while not failed:
            
            # check all the responses
            finished = 0
            for response in responses:
                
                job_response = client.get_batch_prediction_job(name=response.name)
                logging.info(job_response.state.name)
                
                if job_response.state.name == "JOB_STATE_SUCCEEDED":
                    logging.info("Batch prediction job finished successfully.")
                    finished += 1
                elif job_response.state.name == "JOB_STATE_FAILED" or job_response.state.name == "JOB_STATE_CANCELLED":
                    logging.error(f"Batch prediction job failed with state {job_response.state.name}")
                    failed = True                    
                    
            # if we have seen all of our jobs finish
            if finished == len(responses):
                return True
            time.sleep(60)
        # if one of the jobs have failed
        raise ValueError("Batch Preds did not run fully")


    def get_tableids(dataset_id): # dataset_id = [PHISHING_BATCH, REFINED_BATCH]
        # Create a client object
        client = bigquery.Client()
        datasets = list(client.list_datasets(project=project_id))
        # Select a dataset
        for dataset in datasets:
            if dataset.dataset_id == dataset_id:
                dataset_ref = client.dataset(dataset.dataset_id, project=project_id)
                logging.info(str(dataset_ref))
                full_dataset = client.get_dataset(dataset_ref)
                logging.info(str(full_dataset))
                break
        #get pred table
        tables = client.list_tables(full_dataset)
        sorted_tables = sorted(tables, key=lambda table: table.created or table.modified, reverse=True)
        most_recent_table = sorted_tables[0].table_id

        return most_recent_table
    
    def return_df(phish_table, refined_table,jobs_data): # dataset_id = [PHISHING_BATCH, REFINED_BATCH]

        def make_feature_inputs():
            logging.info("making feature attr tables....")

            my_client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
            
            
#             phish_batch = f'fxs-gccr-sbd-dev-sandbox.PHISHING_BATCH.{phish_table}'
#             refined_batch = f'fxs-gccr-sbd-dev-sandbox.REFINED_BATCH.{refined_table}'
                
#             my_query =f"""
#                         BEGIN
#                         -- Define the DistinctURLs in a temporary table
#                         CREATE TEMP TABLE DistinctURLs AS (
#                             SELECT url FROM (
#                             (SELECT url
#                                 FROM `{phish_batch}`,
#                                 UNNEST(predicted_type_code.scores) as score
#                                 ORDER BY score DESC
#                                 LIMIT 1000)
#                             UNION DISTINCT
#                             (SELECT url
#                                 FROM `{phish_batch}`,
#                                 UNNEST(predicted_type_code.scores) as score
#                                 ORDER BY score ASC
#                                 LIMIT 1000)
#                                 UNION DISTINCT
#                                 (SELECT url
#                                 FROM `{refined_batch}`,
#                                 UNNEST(predicted_type_code.scores) as score
#                                 ORDER BY score DESC
#                                 LIMIT 1000)
#                             UNION DISTINCT
#                             (SELECT url
#                                 FROM `{refined_batch}`,
#                                 UNNEST(predicted_type_code.scores) as score
#                                 ORDER BY score ASC
#                                 LIMIT 1000)
#                             )
#                             );
                        
#   -- Create the phish table using the DistinctURLs temporary table
#                         CREATE OR REPLACE TABLE `fxs-gccr-sbd-dev-sandbox.PHISH_FEATURE_ATTR.current_feature_attribute_phish` AS
#                             SELECT d.url, 
#                             p.url AS old_url,
#                             p.having_ip_address AS having_ip_address,
#                             p.abnormal_url AS abnormal_url,
#                             p.count_dot AS count_dot,
#                             p.count_www AS count_www,
#                             p.count_atrate AS count_atrate,
#                             p.count_https AS count_https,
#                             p.short_url AS short_url,
#                             p.count_http AS count_http,
#                             p.count_ques AS count_ques,
#                             p.count_per AS count_per,
#                             p.count_hyphen AS count_hyphen,
#                             p.count_equal AS count_equal,
#                             p.url_length AS url_length,
#                             p.hostname_length AS hostname_length,
#                             p.sus_url AS sus_url,
#                             p.count_digits AS count_digits,
#                             p.count_letters AS count_letters,
#                             p.count_dir AS count_dir,
#                             p.count_embed_domain AS count_embed_domain,
#                             p.fd_length AS fd_length,
#                             p.tld_length AS tld_length,
#                             p.index AS index

#                             FROM DistinctURLs d 
#                             JOIN `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.phish_model_bq_input` p
#                             ON d.url = p.url;

#                         -- Create the refined table using the DistinctURLs temporary table
#                         CREATE OR REPLACE TABLE `fxs-gccr-sbd-dev-sandbox.REFINED_FEATURE_ATTR.current_feature_attribute_refined` AS
#                         SELECT d.url, 
#                             p.abnormal_url AS abnormal_url,
#                             p.count_dot AS count_dot,
#                             p.count_www AS count_www,
#                             p.count_atrate AS count_atrate,
#                             p.count_https AS count_https,
#                             p.count_ques AS count_ques,
#                             p.count_per AS count_per,
#                             p.count_hyphen AS count_hyphen,
#                             p.count_equal AS count_equal,
#                             p.url_length AS url_length,
#                             p.hostname_length AS hostname_length,
#                             p.sus_words AS sus_words,
#                             p.count_digits AS count_digits,
#                             p.count_letters AS count_letters,
#                             p.count_dir AS count_dir,
#                             p.count_embed_domain AS count_embed_domain,
#                             p.fd_length AS fd_length,
#                             p.special_chars AS special_chars,
#                             p.nonstandard_chars AS nonstandard_chars,
#                             p.risky_ext AS risky_ext,
#                             p.ratio_digits AS ratio_digits,
#                             p.double_slash AS double_slash,
#                             p.sus_query AS sus_query,
#                             p.sus_tld AS sus_tld,
#                             p.adult_words AS adult_words,
#                             p.sus_port AS sus_port,
#                             p.ip_address AS ip_address,
#                             p.subdomain_count AS subdomain_count,
#                             p.url AS old_url,
#                             p.index AS index,
#                             p.domain AS domain,
#                             p.path AS path,
#                             p.tld AS tld

#                             FROM DistinctURLs d 
#                             JOIN `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.refined_model_bq_input` p
#                             ON d.url = p.url;

#                         -- Drop the temporary table
#                         DROP TABLE DistinctURLs;
#                         END;
#             """
#             print('finished making tables')
#             my_client.query(my_query)
#             logging.info("done....!")

            api_endpoint = f"{location}-aiplatform.googleapis.com"
            client = aiplatform_v1.services.job_service.JobServiceClient(client_options={"api_endpoint": api_endpoint})
            bq_input_uris = ['bq://fxs-gccr-sbd-dev-sandbox.PHISH_FEATURE_ATTR.current_feature_attribute_phish','bq://fxs-gccr-sbd-dev-sandbox.REFINED_FEATURE_ATTR.current_feature_attribute_refined']
            i = 0
            print('starting making batch preds')

            for table in jobs_data:
                model_name = jobs_data[table]
                bq_output_uri = table

                # Make batch prediction request to the model
                batch_prediction_job = {
                    "display_name": "batch_prediction_job",
                    "model": model_name,
                    "input_config": {
                        "instances_format": "bigquery",
                        "bigquery_source": {"input_uri": bq_input_uris[i]}
                    },
                    "output_config": {
                        "predictions_format": "bigquery",
                        "bigquery_destination": {"output_uri": bq_output_uri},
                    },
                    "dedicated_resources": {
                        "machine_spec": {
                            "machine_type": "n1-standard-8"
                        },
                        "starting_replica_count": 20,
                        "max_replica_count": 200
                    },
                    "explanation_spec": {  # This is the additional field for feature attributions
                        "parameters": {
                            "sampled_shapley_attribution": {  # Using Sampled Shapley feature attribution
                                "path_count": 10  # The number of feature permutations for Shapley values
                            }
                        }
                    },
                    "generate_explanation": True  # Setting this to true to enable explanations
                }
                logging.info(f"batch preds made for {table}")
                
                response = client.create_batch_prediction_job(
                    parent=f"projects/{project_id}/locations/{location}",
                    batch_prediction_job=batch_prediction_job
                )
                i = i+1
                    
            return
        
        def get_predicted_urls():
            # Initialization of the Query

            my_client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
            # The Actual BigQuery Search
            my_query = f"""SELECT 
            t1.index as index,  
            t1.url as url,
            t1.score as phish_score,
            t2.score as murl_score
            FROM (
            SELECT *
                FROM `fxs-gccr-sbd-dev-sandbox.PHISHING_BATCH.{phish_table}`,
                UNNEST(predicted_type_code.scores) as score
                WHERE score >= .995) 
                t1
                JOIN (
                    SELECT *
                    FROM `fxs-gccr-sbd-dev-sandbox.REFINED_BATCH.{refined_table}`,
                    UNNEST(predicted_type_code.scores) as score
                    WHERE score >= .995) 
                    t2
                    ON 
                    t1.index = t2.index AND
                    t1.url = t2.url 
                    ORDER BY t1.index"""
            
            ### BigQuery Test Job Setup
            my_config = bigquery.QueryJobConfig()
            my_config.dry_run = True
            my_test = my_client.query(my_query, job_config=my_config)
            logging.info(f"This job will take ~{my_test.total_bytes_processed/(10**9):.2f} gigabytes or {((my_test.total_bytes_processed/(10**9))/1024)*5:.2f} dollars to run")
            ###

            # Returns the Client and Query to then be ran 
            return my_client, my_query
        

        # test the query, check and make sure it doesn't cost $13.7 billion
        predicted_urls_client, predicted_urls_query = get_predicted_urls()
        # run the query
        logging.info(f"Combining dfs....")
        df = predicted_urls_client.query(predicted_urls_query).to_dataframe()
        df = df.iloc[1::2]  # this line is because of bigquery returning one row for malicious and one row for begign scores
        logging.info(f"Combined!")

        logging.info(f"Making feature attribution batch pred tables....")
        make_feature_inputs()
        logging.info(f"Done....!")


        return df

    
    start = time.time()
    logging.getLogger().setLevel(logging.INFO)
    project_id = 'fxs-gccr-sbd-dev-sandbox'
    todayDate = datetime.date.today()
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    location = 'us-west1'
    phishpoint_model = get_models("PhishPoint_WIndex")
    refined_model = get_models("Repurged_Scaled")
    logging.info("got models")
    

    client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
    temp_table = "fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_megapreprocess_output_temp"
   
    # the above is done so that the bigquery table we pull from only has the most recent week's results
    # batch_jobs(f"bq://{temp_table}", {"PHISHING_BATCH":phishpoint_model, "REFINED_BATCH":refined_model}) 
    
    logging.info("getting phish id")
    
    most_recent_phish_id = get_tableids("PHISHING_BATCH")
    logging.info("got phish id")
    logging.info("phish done!")

    logging.info("getting refined id")
    
    most_recent_refined_id = get_tableids("REFINED_BATCH")
    logging.info("got refined id")
    scaled_df = return_df(most_recent_phish_id, most_recent_refined_id, {"bq://fxs-gccr-sbd-dev-sandbox.PHISH_FEATURE_ATTR":phishpoint_model, "bq://fxs-gccr-sbd-dev-sandbox.REFINED_FEATURE_ATTR":refined_model})
    logging.info("got combined df and started batch preds for feature ")

    scaled_df['week_of'] = pd.to_datetime(previous_monday)
    scaled_df.to_gbq(destination_table=f'umrf_murl_v2_results.TEST_batch_predictions_output', 
        project_id='fxs-gccr-sbd-dev-sandbox', if_exists='append')

    end = time.time()
    timer = f"total time to run: {end-start}"
    logging.info(timer)

    return

In [None]:
#run_batch_preds()

## <a id='user_count'></a>
# <center> Step #7: User Count Scoring

In [None]:
def get_user_count_scoring() -> None:
    import datetime
    from google.cloud import bigquery, storage
    import logging
    from sklearn.preprocessing import MinMaxScaler
    import pandas as pd
    import time
    
    def dataframe_upload(table, monday, df):
        client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
        query = f"DELETE FROM fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.{table} WHERE week_of = '{monday}'"
        client.query(query)
        df.to_gbq(destination_table=f'umrf_murl_v2_results.{table}',
                project_id='fxs-gccr-sbd-dev-sandbox', chunksize=1_000_000, if_exists='replace')
    # this function allows us to quickly and easily normalize the scores
    # according to the drop off that we desire towards the 50 mark
    start = time.time()
    logging.getLogger().setLevel(logging.INFO)
    now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    ######
    client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
    todayDate = datetime.date.today()
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    query = f"SELECT * FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_exploded_query_w_users` WHERE week_of = '{previous_monday}'"
    df = client.query(query).to_dataframe()
    df = df.fillna(0)
    #df = pd.read_csv(input_file)
    scaler = MinMaxScaler(feature_range=(0, 1))
    normalized_values = 1 - scaler.fit_transform(df[['user_count']])
    df['user_count_score'] = normalized_values
    df['user_count_score'].fillna(0, inplace=True)
   
    dataframe_upload('TEST_user_scoring',previous_monday,df )
    
    end = time.time()
    timer = f"total time to run: {end-start}"
    logging.info(timer)
    

In [None]:
#get_user_count_scoring()

## <a id='combine'></a>
# <center> Step #8: Combine Outputs from Batch Predictions and User Count Scoring

In [None]:
def combine_user_dfs() -> None:
    import datetime
    from google.cloud import bigquery, storage
    import logging
    import pandas as pd
    import time
    
    def dataframe_upload(table, monday, df):
        client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
        query = f"DELETE FROM fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.{table} WHERE week_of = '{monday}'"
        client.query(query)
        df.to_gbq(destination_table=f'umrf_murl_v2_results.{table}',
                project_id='fxs-gccr-sbd-dev-sandbox', chunksize=1_000_000, if_exists='replace')
    # this function allows us to quickly and easily normalize the scores

    
    start = time.time()
    logging.getLogger().setLevel(logging.INFO)
    now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    #batch predictions df
    client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
    todayDate = datetime.date.today()
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    batch_query = f"SELECT * FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_batch_predictions_output` WHERE week_of = '{previous_monday}'"

    main_df = client.query(batch_query).to_dataframe()
    #user score df
    user_score_query = f"SELECT * FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_user_scoring` WHERE week_of = '{previous_monday}'"
    second_df = client.query(user_score_query).to_dataframe()
    
    # main_df.set_index('index', inplace=True)
    # print(main_df.columns)
    # second_df.set_index('index', inplace=True)
    # print(second_df.columns)
    #combined_df = main_df.merge(second_df.drop(columns=set(main_df.columns)-{'index'},errors='ignore'), on='index', how='left')
    # For 'user_score'
    main_df['user_count_score'] = main_df.index.map(second_df['user_count_score'].to_dict())   
    main_df['user_count'] = main_df.index.map(second_df['user_count'].to_dict())
    main_df['users_list'] = main_df.index.map(second_df['users_list'].to_dict())
    main_df['index'] = main_df['index'].astype('int')
    
    dataframe_upload('TEST_combine_dfs',previous_monday,main_df )

    end = time.time()
    timer = f"total time to run: {end-start}"
    logging.info(timer)

In [None]:
#combine_user_dfs()

## <a id='RDAP'></a>
# <center> Step #9: RDAP Scoring

In [None]:
def rdap_scored_df() -> None:
    import ast
    import concurrent.futures
    import datetime
    from google.cloud import bigquery, storage
    import logging
    import math
    import numpy as np
    import os
    import pandas as pd
    import requests
    import sys
    import threading
    import time
    import tldextract


    def read_dict_from_gcs(blob):
        # Download the file as a string
        file_content = blob.download_as_text()

        # Parse the string into a dictionary using ast
        dictionary = ast.literal_eval(file_content)
        return dictionary
    
    
    def upload_dict_to_gcs(dictionary, bucket_name, destination_file_name):
        # Convert the dictionary to a string
        from google.cloud import storage
        dictionary_string = str(dictionary)

        # Instantiates a client
        client = storage.Client()

        # Retrieves the bucket
        bucket = client.get_bucket(bucket_name)

        # Uploads the dictionary string to GCS as a .txt file
        blob = bucket.blob(destination_file_name)
        blob.upload_from_string(dictionary_string, content_type='text/plain')

        print(f"Dictionary uploaded to GCS bucket {bucket_name} as {destination_file_name}.")


    def get_basic_rdap(attempts):
        try:
            rdap_bootstrap_url = "https://data.iana.org/rdap/dns.json"
            
            # TODO: No proxy's are being exported, so no proxy is ever used.
            # Get proxy settings from environment variables (if there are any)
            proxies = {
                'http': os.environ.get('http_proxy'),
                'https': os.environ.get('https_proxy')
            }
            for i in range(attempts):
                # Fetch RDAP bootstrap data using proxy settings and 10 seconds timeout
                try:
                    bootstrap_data = requests.get(rdap_bootstrap_url, proxies=proxies, timeout=10).json()
                    logging.info(f'RDAP Successful {i}')
                    return bootstrap_data
                except requests.exceptions.Timeout:
                    logging.warning("Timeout error fetching RDAP bootstrap data.")
                except requests.exceptions.TooManyRedirects:
                    logging.warning("Too many redirects when fetching RDAP bootstrap data.")
                except requests.exceptions.RequestException as e:
                    logging.warning(f"Error fetching RDAP bootstrap data: {e}")
                except ValueError:
                    logging.warning("Invalid JSON received from RDAP bootstrap service.")
            return None
        except Exception as e:
            logging.error(f" EXCEPTION: {e}")
            logging.error("Error grabbing basic rdap JSON")
            sys.exit(61)
    

    def get_rdap_url(domain, bootstrap_data):
        try:
            # Iterate through each entry in the 'services' list in the JSON response data
            for entry in bootstrap_data['services']:
                # Iterate through each domain pattern in the first element of the entry
                for domain_pattern in entry[0]:
                    # If 'domain' ends with the domain pattern
                    if domain.endswith(domain_pattern):
                        # Return the first RDAP server listed in the second element of the entry
                        # Can be queried for info about the domain
                        return entry[1][0] # return the first RDAP server URL for this TLD
            # Return None if no matching RDAP server is found
            return None
        except Exception as e:
            logging.error(f" EXCEPTION: {e}")
            logging.error(f"Error getting rdap_url from {domain}")
            sys.exit(62)
    

    def score_domain(domain_age):
        max_score = 1
        k = 0.004
        min_k = 0.0001
        max_k = 0.01
        k = max(min_k, min(max_k, k))
        
        if domain_age is None or pd.isna(domain_age):
            base_line = max_score / 2
            return np.round(np.interp(base_line, [0, max_score], [0, 1]), 2)
        
        if domain_age < 0:
            domain_age = 0

        score = max_score * math.exp(-k * domain_age)
        score = max(0, min(max_score, score))
        return np.round(np.interp(score, [0, max_score], [0, 1]), 2)


    def domain_getter(url):        
        url_tld = tldextract.extract(url).suffix
        domain_name = tldextract.extract(url).domain
        full_domain = domain_name + '.' + url_tld
        return full_domain
    

    def domain_func(domain, bootstrap_data):
        try:
            nonlocal no_tld_expert_failures, http_failures, no_events_failures, no_registration_failures, misc_failures, request_counter, already_cached, rdap_semaphore, domain_cache
                    
            if domain in domain_cache:

                creation_date = datetime.datetime.strptime(domain_cache[domain], '%Y-%m-%d')
                logging.info(f'Creation date: {creation_date}')
                # if we have made it to this point, we have a creation date
                current_date = datetime.datetime.today()
                age_in_days = (current_date - creation_date).days

                # and we retrieve a final score using a "score_domain function"
                already_cached += 1
                
                logging.info(f'Already Cached: {already_cached}')
                return domain, score_domain(age_in_days)
        except Exception as e:
            logging.error(f'Exception for domain {domain}: {e}')
            return domain, None

        
        # If the domain starts with 'www.', remove the prefix
        if domain.startswith('www.'):
            domain = domain[4:]
        # Get the RDAP server URL for the domain
        rdap_url = get_rdap_url(domain, bootstrap_data)
        
        if not rdap_url:
            no_tld_expert_failures += 1
            if no_tld_expert_failures % 10 == 0:
                logging.warning(f"no_tld_expert_failures: {no_tld_expert_failures}")
            logging.warning(f"No tld expert found for {domain}")
            return domain, None
        # Construct the RDAP info URL
        if rdap_url[-1] == "/":
            rdap_info_url = f"{rdap_url}domain/{domain}"
            logging.info(f"didnt need slash")
        else:
            rdap_info_url = f"{rdap_url}/domain/{domain}"
            logging.info(f"needed slash")
        
        try:
            logging.info(f'Trying rdap_semaphore: {rdap_semaphore}')
            # Send a GET request to the RDAP info URL with a 10 seconds timeout
            with rdap_semaphore:
                request_counter += 1
                if request_counter % 1000 == 0:
                    logging.info(f"{request_counter} requests done {time.time()}")
                response = requests.get(rdap_info_url, timeout=10)
                
            # If the request is successful, return the JSON response data
            if not response.ok:
                http_failures += 1
                if http_failures % 10 == 0:
                    logging.error(f"Error fetching RDAP info for {domain} from {rdap_info_url}: HTTP {response.status_code}. Reason: {response.reason}")
                    print(f"http_failures: {http_failures}")
                return domain, None
            response = response.json()
        except Exception as e:
            misc_failures += 1
            if misc_failures % 10 == 0:
                logging.warning(f"misc_failures: {misc_failures}")
                logging.error(f"exception during request getting {e} from domain: {domain} to url {rdap_info_url}")
            return domain, None
        
        if 'events' not in response:
            no_events_failures += 1
            if no_events_failures % 1000 == 0:
                logging.error(f"no_events_failures: {no_events_failures}")
                logging.error(f"No events from {rdap_info_url}")
            return domain, None
        creation_date = None
        for event in response['events']:
            if event['eventAction'] == 'registration':
                creation_date = datetime.datetime.strptime(event['eventDate'][0:10], '%Y-%m-%d')
                domain_cache[domain] = creation_date.strftime('%Y-%m-%d')
                break

        # if we didn't find a creation date
        if creation_date is None:
            no_registration_failures += 1
            if no_registration_failures % 1000 == 0:
                print(f"no_registration_failures: {no_registration_failures}")
            logging.warning(f"No date found from {rdap_info_url}")
            return domain, None

    # if we have made it to this point, we have a creation date
        current_date = datetime.datetime.today()
        age_in_days = (current_date - creation_date).days

    # and we retrieve a final score using a "score_domain function"

        return domain, score_domain(age_in_days)
        
      
    logging.getLogger().setLevel(logging.INFO)
    bucket = "suspicious_user_bucket"
    file_name = "rdap_cache/rdap_domains.txt"
    client = storage.Client()
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    domain_cache = {}
    if blob.exists():
        domain_cache = read_dict_from_gcs(blob)
    logging.info("Got Cache")
    
    no_tld_expert_failures = http_failures = no_events_failures = no_registration_failures = misc_failures = request_counter = already_cached = 0
    start = time.time()
    
    client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
    todayDate = datetime.date.today()
    previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
    logging.info("sending query")
    query = f"""SELECT * 
    FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_combine_dfs` 
    WHERE week_of = '{previous_monday}'
    limit 500000"""
    df = client.query(query).to_dataframe()
    df_len = len(df)
    logging.info(f"Loaded {df_len} rows")
    score_dict = {}
    df['rdap_score'] = None
    df = df.sample(frac=1)
    logging.info("Getting domains")
    df['domain'] = df['url'].apply(domain_getter)

    domains = set(df['domain'])
    logging.info(f"{len(domains)} domains")
    bootstrap_data = get_basic_rdap(attempts=10)
    if bootstrap_data is None:
        logging.critical("PLEASE HELP RDAP FAILED 10 TIMES IN A ROW THIS IS REALLY REALLY BAD")
        return 
    rdap_semaphore = threading.Semaphore(4)
    logging.info("Starting threads")
    
    ## Make threads loop through domains and make requests. 
    # creating a dictionary with keys being domain and value being the creation date of the domain
    # once all are done we can make new column based on responses
    answers = {}
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Mapping index and url to process_url function
        futures = {executor.submit(domain_func, domain, bootstrap_data): domain for domain in domains}
        #futures = {executor.submit(process_url, index, row['url']): (index, row['url']) for index, row in df.iterrows()}
        for future in concurrent.futures.as_completed(futures):
            domain, result = future.result()
            answers[domain] = result
    df['rdap_score'] = df.domain.map(answers)
    

    bucket = "suspicious_user_bucket"
    file_name = "rdap_cache/rdap_domains.txt"

    # Call the function to upload the dictionary to GCS
    upload_dict_to_gcs(domain_cache, bucket, file_name)
    print(df.info())
    df.to_gbq(destination_table=f'umrf_murl_v2_results.TEST_rdap', 
        project_id='fxs-gccr-sbd-dev-sandbox', if_exists='replace')
    end = time.time()
    timer = f"total time to run: {end-start}"
    logging.info(timer)
    logging.info(f"tld expert failures: {no_tld_expert_failures}")
    logging.info(f"http failures: {http_failures}")
    logging.info(f"No events found failures: {no_events_failures}")
    logging.info(f"no registration dates: {no_registration_failures}")
    logging.info(f"Misc. Failures {misc_failures}")
    total_failures = no_tld_expert_failures + http_failures + no_events_failures + no_registration_failures + misc_failures
    logging.info(f"Total Failures: {total_failures}")
    logging.info(f"requests sent: {request_counter}")
    logging.info(f"Domains already cached {already_cached}")
    try:
        logging.info(f"Failure rate: {total_failures / len(domains)}")
    except ZeroDivisionError:
        logging.info("failure rate: Undefined no requests sent.")

In [None]:
# rdap_scored_df()

## <a id='total'></a>
# <center> Step #10: Find Total Score

In [None]:
def top_url_by_domain() -> None:
    #sift through input file, take top 3500 URLs
    import datetime
    import logging
    import pandas as pd
    import collections
    from collections import Counter
    import tldextract
    from typing import NamedTuple
    from sklearn.preprocessing import MinMaxScaler
    import gcsfs
    import numpy as np
    from google.cloud import bigquery, storage
    import re
    
    def dataframe_upload(table, monday, df):
        client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
        query = f"DELETE FROM fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.{table} WHERE week_of = '{monday}'"
        client.query(query)
        df.to_gbq(destination_table=f'umrf_murl_v2_results.{table}',
                project_id='fxs-gccr-sbd-dev-sandbox', chunksize=1_000_000, if_exists='replace')

    def domain_purger(url):
        try:        
            url_sub = tldextract.extract(url).subdomain.split('.')[-1]
            if len(url_sub) > 0:
                url_sub = url_sub + '.' 
            domain_name = tldextract.extract(url).domain

            url_tld = tldextract.extract(url).suffix

            full_domain = url_sub + domain_name + '.' + url_tld
            return full_domain
        except:
            logging.critical("Failed to purge similar urls, skipping this part!")

    def high_scores_getter(df, total_urls):
        df = df.copy()
        # Get the top results and ensure unique selections
        selected_indices = set()
        top_phish_df = df.nlargest(int(total_urls * .2), 'total_phish_score')
        selected_indices.update(top_phish_df.index)
        df.drop(index=list(selected_indices), inplace=True)


        selected_indices = set()
        top_total_df = df.nlargest(int(total_urls * .2), 'total_score')
        selected_indices.update(top_total_df.index)
        df.drop(index=list(selected_indices), inplace=True)


        selected_indices = set()
        logging.info(df['tld_count'].value_counts())
        top_murl_suspicious_tld_df = df[df['tld_count'] > 0].nlargest(int(total_urls * .15), 'murl_score')
        selected_indices.update(top_murl_suspicious_tld_df.index)
        df.drop(index=list(selected_indices), inplace=True)


        selected_indices = set()
        logging.info(df['suspicious_keyword'].value_counts())
        top_murl_suspicious_keyword_df = df[df['suspicious_keyword'] > 0].nlargest(int(total_urls * .15), 'murl_score')
        selected_indices.update(top_murl_suspicious_keyword_df.index)
        df.drop(index=list(selected_indices), inplace=True)


        selected_indices = set()
        logging.info(df['adult_sus_words'].value_counts())
        top_murl_adult_sus_keyword_df = df[df['adult_sus_words'] > 0].nlargest(int(total_urls * .15), 'murl_score')
        selected_indices.update(top_murl_adult_sus_keyword_df.index)
        df.drop(index=list(selected_indices), inplace=True)

        top_murl_df = df.nlargest(int(total_urls * .15), 'murl_score')

        logging.info("Got small dataframes")
        # Concatenate the DataFrames
        high_scores_df = pd.concat([top_phish_df, top_total_df, top_murl_suspicious_tld_df, top_murl_suspicious_keyword_df, top_murl_df, top_murl_adult_sus_keyword_df])
        logging.info("put dataframes together")
        return high_scores_df


    try:
        logging.getLogger().setLevel(logging.INFO)
        now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

        client = bigquery.Client(project="fxs-gccr-sbd-dev-sandbox")
        todayDate = datetime.date.today()
        previous_monday = todayDate + datetime.timedelta(days=-todayDate.weekday(), weeks=-1)
        query = f"SELECT * FROM `fxs-gccr-sbd-dev-sandbox.umrf_murl_v2_results.TEST_rdap` WHERE week_of = '{previous_monday}'"
        logging.info("querying...")
        full_df = client.query(query).to_dataframe()
        logging.info("successfully queried")
        logging.info(f'This is the length before purging similar urls: {len(full_df)}')
        full_df['url_purger'] = full_df['url'].apply(domain_purger)


        full_df.drop_duplicates(subset='url_purger', inplace=True)
        logging.info(f'This is the length after purging similar urls: {len(full_df)}')
        full_df.drop(columns=['url_purger'], inplace=True)
        logging.info("purge ended")




        full_df['rdap_score'].fillna(0, inplace=True)
        full_df['total_phish_score'] = full_df['phish_score'] + full_df['rdap_score'] + full_df['user_count_score']
        full_df['total_score'] = full_df['phish_score'] + full_df['rdap_score'] + full_df['user_count_score'] + full_df['murl_score']
        logging.info("added scores")

        def count_suspicious_tlds(url):
            parts = url.split('.')
            tld_counts = collections.Counter(tldextract.extract(part).suffix for part in parts)
            del tld_counts['']
            return sum(tld_counts.values())


        def count_suspicious_keyword(url):
            # check for sus keywords in url
            suspicious_keywords = [
            'login', 'signin', 'password', 'secure', 'account', 'verification', 'validate', 'confirm', 'token', 'update', 'registry', 'payment', 'credit',
            'transaction', 'admin', 'service', 'webmaster', 'helpdesk', 'paypal', 'ebay', 'amazon', 'bank', 'wellsfargo', 'chase', 'citi', 'boa', 'fedex', 'microsoft', 
            '.exe', '.zip', '.rar', '.doc', '.xls', '.pdf', 
            'free', 'gift', 'promo', 'offer', 'download', '.dll', 'prize', 'reward', 'sweepstakes', 'lottery', 'winner', 'congratulations',
            'script', 'stream', 'play', 'game', 'invoke', 'download', 'cdn', 'media', 'video', 'manga'
            ]
            return sum(keyword in url.lower() for keyword in suspicious_keywords)


        def count_adult_sus_words(url):
            # check for 'adult' keywords in url
            adult_sus_words = [
            'camgirl', 'porn', 'xxx',  'pornhub', 'xvideo', 'xhamster', 'nsfw', 'only','feet', 'kink', 'nude', 'leaked', 'boob', 'boobs', 'lesbian', 'blacked', 
            'leaks', 'fap', 'voyuer'
            ]
            return sum(keyword in url.lower() for keyword in adult_sus_words)


    



        full_df['tld_count'] = full_df['url'].apply(count_suspicious_tlds)
        full_df['suspicious_keyword'] = full_df['url'].apply(count_suspicious_keyword)
        full_df['adult_sus_words'] = full_df['url'].apply(count_adult_sus_words)

        scaler = MinMaxScaler()
        full_df['tld_count'] = scaler.fit_transform(np.array(full_df['tld_count']).reshape(-1, 1))
        full_df['total_phish_score'] = scaler.fit_transform(full_df['total_phish_score'].values.reshape(-1, 1))
        full_df['total_score'] = scaler.fit_transform(full_df['total_score'].values.reshape(-1,1))



        logging.info("added normalized and added counts")
        shorteners = [
            "bit.ly", "goo.gl", "shorte.st", "go2l.ink", "x.co", "ow.ly", "t.co", "tinyurl",
            "tr.im", "is.gd", "cli.gs", "yfrog.com", "migre.me", "ff.im", "tiny.cc", "url4.eu",
            "twit.ac", "su.pr", "twurl.nl", "snipurl.com", "short.to", "BudURL.com", "ping.fm",
            "post.ly", "Just.as", "bkite.com", "snipr.com", "fic.kr", "loopt.us", "doiop.com",
            "short.ie", "kl.am", "wp.me", "rubyurl.com", "om.ly", "to.ly", "bit.do", "lnkd.in",
            "db.tt", "qr.ae", "adf.ly", "bitly.com", "cur.lv", "tinyurl.com", "ity.im", "q.gs",
            "po.st", "bc.vc", "twitthis.com", "u.to", "j.mp", "buzurl.com", "cutt.us", "u.bb",
            "yourls.org", "prettylinkpro.com", "scrnch.me", "filoops.info", "vzturl.com", "qr.net",
            "1url.com", "tweez.me", "v.gd", "link.zip.net"]
        match = r"^(%s)" % "|".join(map(re.escape, shorteners))
        shortening_filter = full_df['url'].str.contains(match) # a filter of any url that matches a shortening services
        logging.info("Made Filter")
        top_df = full_df[~shortening_filter] # this is done so that the top_df doesn't have any urls that are shortening services
        shortening_df = full_df[shortening_filter]  # new df of only rows included with shortening services.
        logging.info("Used Filters")

        print(full_df[full_df['murl_score'].isnull()])
        full_df.dropna(subset=['murl_score'], inplace=True)
        print(full_df[full_df['total_phish_score'].isnull()])
        full_df.dropna(subset=['total_phish_score'], inplace=True)
        print(full_df[full_df['total_score'].isnull()])
        full_df.dropna(subset=['total_score'], inplace=True)

        top_df = pd.concat([
            full_df.loc[full_df.groupby('domain')['murl_score'].idxmax()],
            full_df.loc[full_df.groupby('domain')['total_phish_score'].idxmax()],
            full_df.loc[full_df.groupby('domain')['total_score'].idxmax()],
            ])
        logging.info("filtered by domain")
        top_df.drop_duplicates(subset='domain', inplace=True)  # leaves only 1 url per domain. 
        #^ in the future maybe give a reason to which is chosen? think currently it's whichever is first
        logging.info("dropped duplicate domains")
        logging.info(f"{len(top_df)} urls left")

        virus_total_df = high_scores_getter(top_df, 3500)
        high_scores_df = high_scores_getter(top_df, 75)
        
        
        virus_total_df['is_high_score'] = virus_total_df['url'].isin(high_scores_df['url'])
        virus_total_df['has_been_virus_totaled'] = False
        virus_total_df['positives'] = None
        virus_total_df['total'] = None

        dataframe_upload('TEST_shortening_services_results',previous_monday,shortening_df )
        dataframe_upload('TEST_high_score_results',previous_monday,high_scores_df )
        dataframe_upload('TEST_full_score_results',previous_monday,full_df )


        
        virus_schema=[
            {"name": "index", "type": "INTEGER"},
            {"name": "url", "type": "STRING"},
            {"name": "phish_score", "type": "FLOAT"},
            {"name": "murl_score", "type": "FLOAT"},
            {"name": "week_of", "type": "TIMESTAMP"},
            {"name": "user_count_score", "type": "FLOAT"},
            {"name": "user_count", "type": "INTEGER"},
            {"name": "users_list", "type": "STRING"},
            {"name": "rdap_score", "type": "FLOAT"},
            {"name": "domain", "type": "STRING"},
            {"name": "total_phish_score", "type": "FLOAT"},
            {"name": "total_score", "type": "FLOAT"},
            {"name": "tld_count", "type": "FLOAT"},
            {"name": "suspicious_keyword", "type": "INTEGER"},
            {"name": "adult_sus_words", "type": "FLOAT"},
            {"name": "is_high_score", "type": "BOOLEAN"},
            {"name": "has_been_virus_totaled", "type": "BOOLEAN"},
            {"name": "positives", "type": "INTEGER"},
            {"name": "total", "type": "INTEGER"}
        ]
        virus_total_df.to_gbq(destination_table=f'umrf_murl_v2_virustotal.virus_total_results_refined', 
            project_id='fxs-gccr-sbd-dev-sandbox', if_exists='append', table_schema=virus_schema)
    
        return
    except Exception as e:
        logging.error(e)
        logging.error("Outer function Failed.")

In [None]:
#top_url_by_domain()

## <a id='pipeline'></a>
# <center> Step #11: KubeFlow Pipelining

### <center> Overwrite definitions with cached versions to speed up and save money during testing


In [None]:
# UNCOMMENT THESE OUT AND CHANGE FILES WHENEVER YOU NEED TO USE CACHED VERSION FOR SPEED/MONEY

def query_job():
    return

def early_RDAP():
    return

def megapreprocess_op():
   return

# def run_batch_preds():
#     return

def get_user_count_scoring():
    return

def combine_user_dfs():
    return 

def rdap_scored_df():
    return


def top_url_by_domain():
    return 

## <a id='component'></a>
# <center> Step #11.1: Component Initialization

In [None]:
import kfp.components as comp
from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_from_component

query_job_component = comp.create_component_from_func(
    func= query_job,  # Python function we're creating a component from
    base_image= 'python:3.9',
    packages_to_install=['pandas','google-cloud-bigquery[pandas]', 'google-cloud-storage', 'pandas-gbq'],
    output_component_file='query_job_comp.yaml'
)

query_custom_job = create_custom_training_job_from_component(
    component_spec=query_job_component,
    replica_count=1,
    machine_type="n1-standard-16",
    display_name="Query custom job",
    service_account=SERVICE_ACCOUNT,
)

preprocessing  = comp.create_component_from_func(
    func = megapreprocess_op,
    base_image= 'python:3.9',
    packages_to_install=['pandas', 'scikit-learn', 'tld', 'google-cloud-storage','fsspec','gcsfs', 'pandas-gbq','tldextract'],
    output_component_file='preprocess_comp.yaml'
)

preprocessing_custom_job = create_custom_training_job_from_component(
    preprocessing,
    replica_count=1,
    machine_type="c2-standard-30",
    display_name="preprocessing custom job",
    service_account=SERVICE_ACCOUNT,
)

user_scoring = comp.create_component_from_func(
    func =  get_user_count_scoring,
    base_image= 'python:3.9',
    packages_to_install=['numpy','pandas','google-cloud-bigquery[pandas]', 'google-cloud-storage','fsspec','gcsfs', 'scikit-learn', 'pandas-gbq','tldextract'],
    output_component_file='user_scoring_comp.yaml'
)

user_scoring_custom_job = create_custom_training_job_from_component(
    user_scoring,
    replica_count=1,
    machine_type="n1-standard-16",
    display_name="User Scoring custom job",
    service_account=SERVICE_ACCOUNT,
)

batch_predictions = comp.create_component_from_func(
    func = run_batch_preds,
    base_image= 'python:3.9',
    packages_to_install=['google-cloud-bigquery[pandas]', 'google-cloud-storage','google-cloud-aiplatform','fsspec','gcsfs', 'pandas-gbq','tldextract'],
    output_component_file='run_batch_preds.yaml'
)

batch_predictions_custom_job = create_custom_training_job_from_component(
    batch_predictions,
    replica_count=1,
    machine_type="c2-standard-30",
    display_name="Batch Predictions custom job",
    service_account=SERVICE_ACCOUNT,
)

combine_dfs =  comp.create_component_from_func(
    func =  combine_user_dfs,
    base_image= 'python:3.9',
    packages_to_install=['numpy','pandas','google-cloud-bigquery[pandas]', 'google-cloud-storage','fsspec','gcsfs', 'pandas-gbq','tldextract'],
    output_component_file='combine_dfs_comp.yaml'
)

combine_custom_job = create_custom_training_job_from_component(
    combine_dfs,
    replica_count=1,
    machine_type="n1-highmem-16",
    display_name="Combine outputs custom job",
    service_account=SERVICE_ACCOUNT,
)
early_rdap_comp = comp.create_component_from_func(
    func=early_RDAP,
    base_image='python:3.9',
    packages_to_install=['numpy', 'requests','pandas', 'google-cloud-storage','fsspec','gcsfs', 'google-cloud-aiplatform', 'pandas-gbq','tldextract'],
    output_component_file='early_rdap_comp.yaml'
)

early_rdap_custom_job = create_custom_training_job_from_component(
    early_rdap_comp,
    replica_count=1,
    machine_type="n1-highmem-16",
    display_name="Early rdap custom job",
    service_account=SERVICE_ACCOUNT,
)

rdap_scored_comp = comp.create_component_from_func(
    func = rdap_scored_df,
    base_image= 'python:3.9',
    packages_to_install=['numpy', 'requests','pandas', 'google-cloud-storage','fsspec','gcsfs', 'pandas-gbq', 'tldextract'],
    output_component_file='rdap_comp.yaml'
)

new_total_scoring = comp.create_component_from_func(
    func = top_url_by_domain,
    base_image= 'python:3.9',
    packages_to_install=['numpy', 'pandas', 'google-cloud-storage','fsspec','gcsfs', 'scikit-learn', 'pandas-gbq', 'tldextract'],
    output_component_file='total_scoring_comp.yaml'
)

## <a id='define'></a>
# <center> Step #11.2: Defining the pipeline

In [None]:
import kfp
from kfp import dsl
from kfp import components as comp

# TESTING
# DEFINING PIPELINE
@dsl.pipeline(
    name='testrunpipeline',
    description='ML pipeline test'
)
def pipeline2():
    import logging
    logging.info('Querying CIM Table Started')
    # query_step = query_job_component()
    query_step = query_custom_job(project=PROJECT_ID)

    
    logging.info('Preprocessing Started')
    preprocessing_step = preprocessing_custom_job(project=PROJECT_ID).after(query_step)
    logging.info('Preprocessing Done')
    
    logging.info('User Scoring Started')
    user_scoring_step = user_scoring_custom_job(project=PROJECT_ID).after(query_step)
    logging.info('User Scoring Done')
    
    logging.info('Early RDAP Started')
    early_rdap_wait = early_rdap_custom_job(project=PROJECT_ID).after(query_step)
    logging.info('Early RDAP Done')

    logging.info('Batch Predictions started')
    batch_predictions_step = batch_predictions_custom_job(project=PROJECT_ID).after(preprocessing_step)
    logging.info('Batch Predictions Done')
    
    logging.info('Combining DFs Started')
    combine_dfs_step = combine_custom_job(project=PROJECT_ID).after(batch_predictions_step, user_scoring_step, early_rdap_wait)
    logging.info('Combining DFs Done')
    
    logging.info('Full RDAP Scoring Started')
    rdap_scored_step = rdap_scored_comp().after(combine_dfs_step)
    logging.info('RDAP Done')

    logging.info('Total scoring started')
    total_score_step = new_total_scoring().after(rdap_scored_step)

## <a id='compile'></a>
# <center> Step 11.3: Compile The Pipeline

In [None]:
# May get a deprecation warning message

compiler.Compiler().compile(
    pipeline_func=pipeline2,
    package_path="scaled_test.json"
)

## <a id='submit'></a>
# <center> Step 11.4: Submitting the pipeline

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION)
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching = False, # False
    display_name = PIPELINE_NAME,
    template_path = "scaled_test.json"
)

pipeline_.submit(service_account='my-bigquery-sa@fxs-gccr-sbd-dev-sandbox.iam.gserviceaccount.com')