### Initialize

In [1]:
spark

In [2]:
spark.sql("SET spark.sql.shuffle.partitions = 1024")

DataFrame[key: string, value: string]

In [3]:
from datetime import datetime, timedelta
import pickle
from random import randint, choice
import re
import string

import pandas as pd

### Load in dataset of Covid-19-related pages
* Note: this is based on: https://covid-data.wmflabs.org/pagesNoHumans
* Additional preprocessing is done to remove duplicates, non-wiki, and non-namespace 1 articles.
* The following articles (in all languages) are also added: [Coronavirus](https://en.wikipedia.org/wiki/Coronavirus)

In [4]:
# TSV: "pagesPerProjectNonHumans20200330.tsv"
# Pickle: "pagesPerProjectNonHumans20200330.pickle"
# Parquet: "/user/dsaez/pagesRelatedWithCOVID19-upto20200301.parquet"
pages_path = "pagesPerProjectNonHumans20200420_pluscoronavirus.tsv"
covid_table_name = 'covid_pages'
if pages_path.endswith('.parquet'):
    spark.read.parquet(pages_path).createOrReplaceTempView(covid_table_name)
elif pages_path.endswith('.pickle'):
    print("Current Pandas version: {0} -- probably not compatible with file and can't be upgraded per: "
          "https://wikitech.wikimedia.org/wiki/SWAP#Spark".format(pd.__version__))
    df = pd.read_pickle(pages_path)
    spark.createDataFrame(df).createOrReplaceTempView(covid_table_name)
elif pages_path.endswith('.tsv'):
    df = pd.read_csv(pages_path, sep='\t')
    df['page_id'] = df['pageid']
    df = df[['project', 'page_id']]
    spark.createDataFrame(df).createOrReplaceTempView(covid_table_name)
else:
    print("Did not recognize file-type.")

In [5]:
spark.sql('describe {0}'.format(covid_table_name)).show() 

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| project|   string|   null|
| page_id|   bigint|   null|
+--------+---------+-------+



In [6]:
# NOTE: I haven't rerun this since I removed the page_title and P31-Label columns
spark.sql('SELECT * FROM {0} WHERE page_title LIKE "%2019%" AND project = "en.wikipedia"'.format(covid_table_name)).show(100)

+--------------------+------------+--------------------+--------+
|          page_title|     project|           P31-Label|  pageid|
+--------------------+------------+--------------------+--------+
|2019–20 coronavir...|en.wikipedia|{'public health e...|62750956|
|2019–20 coronavir...|en.wikipedia|{'Wikimedia list ...|62938755|
|2019–20 coronavir...|en.wikipedia|{'disease outbrea...|63039926|
|2019–20 coronavir...|en.wikipedia|{'disease outbreak'}|63183247|
|2019–20 coronavir...|en.wikipedia|{'disease outbreak'}|63325727|
|2019–20 coronavir...|en.wikipedia|{'disease outbreak'}|63004998|
|Coronavirus disea...|en.wikipedia|{'emerging infect...|63030231|
|List of deaths du...|en.wikipedia|{'Wikimedia list ...|63417935|
|List of events af...|en.wikipedia|{'Wikimedia list ...|63351852|
|Mental health dur...|en.wikipedia|              {None}|63499429|
|Shortages related...|en.wikipedia|              {None}|63453597|
|Timeline of the 2...|en.wikipedia|{'Wikimedia timel...|63052529|
|Travel re

In [6]:
count_per_db = spark.sql('SELECT project, count(*) FROM {0} GROUP BY project'.format(covid_table_name))
count_per_db.sort('count(1)', ascending=False).show(100)

+--------------------+--------+
|             project|count(1)|
+--------------------+--------+
|        en.wikipedia|     367|
|        ar.wikipedia|     250|
|        pt.wikipedia|     184|
|        zh.wikipedia|     184|
|        de.wikipedia|     175|
|        ko.wikipedia|     161|
|        vi.wikipedia|     146|
|        uk.wikipedia|     117|
|        fr.wikipedia|     112|
|        he.wikipedia|      96|
|        es.wikipedia|      82|
|        tr.wikipedia|      82|
|        ru.wikipedia|      70|
|        ca.wikipedia|      60|
|        pl.wikipedia|      59|
|        nl.wikipedia|      50|
|        id.wikipedia|      47|
|        be.wikipedia|      43|
|        uz.wikipedia|      38|
|        ms.wikipedia|      38|
|        pa.wikipedia|      38|
|        et.wikipedia|      38|
|        it.wikipedia|      37|
|    zh-yue.wikipedia|      37|
|        ja.wikipedia|      35|
|        ht.wikipedia|      34|
|        az.wikipedia|      33|
|        ta.wikipedia|      32|
|       

### Generate table with anonymized/filtered webrequests

In [11]:
do_execute = True
table_name = 'isaacj.covid19_sessions'
create_table_query = """
    CREATE EXTERNAL TABLE IF NOT EXISTS {0} (
        session_hash     STRING  COMMENT 'Unique identifier for session',
        continent        STRING  COMMENT 'Reader continent per IP geolocation',
        country          STRING  COMMENT 'Reader country per IP geolocation',
        subdivision      STRING  COMMENT 'Reader subdivision per IP geolocation',
        timezone         STRING  COMMENT 'Reader timezone per IP geolocation',
        hour             INT     COMMENT 'Unpadded hour of request',
        project          STRING  COMMENT 'Wikipedia project -- e.g., en.wikipedia',
        namespace_id     INT     COMMENT '0 for articles; 1 for talk pages',
        qid              STRING  COMMENT 'Wikidata ID associated with article -- e.g., Q42',
        page_id          INT     COMMENT 'Page ID for article viewed -- automatically resolves redirects',
        title            STRING  COMMENT 'Page title viewed -- preserves redirects taken',
        is_covid         BOOLEAN COMMENT '1 if matches Covid-19 list; 0 otherwise',
        referer          STRING  COMMENT 'Host of referer URL -- e.g., www.google.com',
        referer_class    STRING  COMMENT 'High-level class of referer (external, search engine, direct, internal)',
        access_method    STRING  COMMENT 'Version of Wikipedia viewed -- desktop or mobile',
        last_access      STRING  COMMENT 'Date device was last seen per WMF-Last-Access cookie',
        min_btw_pvs      DOUBLE  COMMENT 'Integer # of minutes since prior pageview in session',
        session_sequence INT     COMMENT 'Index of pageview in session -- starting at 1 and ordered chronologically'
    )
    PARTITIONED BY (
        year             INT     COMMENT 'Unpadded year of request',
        month            INT     COMMENT 'Unpadded month of request',
        day              INT     COMMENT 'Unpadded day of request')
    STORED AS PARQUET
    LOCATION 'hdfs://analytics-hadoop/user/isaacj/covid19'
    """.format(table_name)

if do_execute:
    spark.sql(create_table_query)

### Populate Table
Filtered down to:
* Wikipedia project family
* Pageviews
* Namespace = 0 (articles) and Namespace = 1 (talk pages)
* No sessions with evidence of edit activity
* Agents labeled as users (NOTE: this is imperfect so users w/ >500 pageviews are also removed)

Anonymization steps:
* Hash user-agent/IP
* Only retain geographic information at the subdivision level (no cities, latitude, or longitude)
* Blacklist certain small or sensitive countries
* Only retain a sample of users to reduce likelihood that an individual will appear across multiple days in the dataset

In [11]:
first_date = "2020-01-01"
num_days = 90
session_length = 60 * 60  # 1 hour = 60 sec * 60 min

dt = datetime.strptime(first_date, "%Y-%m-%d")
print("To query from {0} up to but not including {1}".format(datetime.strftime(dt, "%Y-%m-%d"),
                                                             datetime.strftime(dt + timedelta(days=num_days), "%Y-%m-%d")))

To query from 2020-01-08 up to but not including 2020-01-20


In [None]:
# Acceptance criteria:
# * Control sessions: take 1% sample and retain if >= 500 unique userhashes
# * Covid-19 sessions: take 50% sample and retain if:
# ** control sample met threshold AND
# ** covid-19 sample also meets 500 unique userhashes threshold
print_for_hive = False
do_execute = True

for i in range(num_days):    
    # salts for UA/IP hash (1st = userhash for whole day; 2nd = session hashes)
    salt_one = ''.join(choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(randint(8,16)))
    salt_two = ''.join(choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(randint(8,16)))

    # matching by last digit of IP is ~random means of sampling devices (no obvious skew)
    # Due to IPv6, IP addresses can actually end in [0-9] or [a-f]
    # There are still many more IPv4 addresses than IPv6 though, so using a [0-9] gives you ~5x more data than a letter
    # North America has a slightly higher proportion of IPv6 so they are undersampled slightly through this method
    # This is not currently used in the full query but is useful for testing
    # and can be added to WHERE clauses when selecting from wmf.webrequest: AND SUBSTR(ip, -1, 1) = {7}
    ip_match = randint(0, 9)

    # we keep each day separate and use new salts / ip_matches because:
    #   * this keeps the processing simpler (e.g., for order-by clauses)
    #   * this helps to preserve anonymization by limiting the amount of data associated with a single user ID
    #   * a IP/UA hash has only been shown to captures most of the pageviews by an individual (~75% per former analyses)
    #     for a given 24 hour period. Beyond 24 hours, it is unclear how effective this method is.
    #   * limiting to 24-hours per hash should help ensure that we do not incorrectly merge multiple users together

    # query for a given day
    query_date = dt + timedelta(days=i)

    # We exclude edits by the following logic:
    # (uri_query LIKE '%action=edit%'): desktop wikitext editor
    # (uri_query LIKE '%action=visualeditor%'): desktop and mobile visualeditor
    # (uri_query LIKE '%&intestactions=edit&intestactionsdetail=full&uiprop=options%'): mobile wikitext editor
    # NOTE for mobile wikitext editor: this is actually an API call so theoretically anyone could make it, but 
    # in practice we see literally 100% of calls to intestactions=edit are mobile wikitext editor and the rest
    # of the URL parameters in the string is to ensure no false positives.
    
    # We also exclude mobile_app users because they are:
    # * a small proportion of our pageviews
    # * more identifiable as a result (and have unique app IDs in x_analytics_header)
    # * I have not figured out what their edit clauses look like
    
    # We exclude a blacklist of countries (wmf.geoeditors_blacklist_country)
    # with the exception of Iran and Russia given their research relevance for Covid-19
    
    # We exclude users with > 500 pageviews in a day because anecdotally these are split between:
    # * serious power users
    # * a bunch of different people all using the same proxy (e.g., Google Weblight)
    # * unidentified bots
    
    # We take 1% of all users and 50% of users who viewed a Covid-19-related page in their session:
    # we randomly sample individual users from a day's worth of data to reduce the likelihood of any one
    # person showing up consistently in each day's sample. This is done by taking a userhash (before sessionization)
    # and converting it to decimal and then sampling based on the final digits of that resulting number.
    # For example, a sha256 userhash might be '0d03031932aa1d96d54327ea4754b393a62feb61585fef6cf06daee9cf848b27'
    # For sampling, if we take the final 16 characters of that userhash ('f06daee9cf848b27')
    #   and convert that to decimal, it is: 17324695660796349223
    #   if we then divide this by 18446744073709551615, we will get a number between 0 and 1
    #   in this case it is 0.9391736336542796
    #   we can simply define our sampling by only retaining users for which that number X is below Y
    #   e.g., a 1% sample is X < 0.01 and a 50% sample would be X < 0.5 -- in both cases, this user would not be retained
    # Alternatively (for sampling): 
    #   In decimal, the example userhash would be: 5885388957350101327411052618704501086212173454334169283944732915681982712615
    #   If we take the final four digits of that (2615), we can determine whether the user is part of a 1% sample by
    #   testing whether that number is less than 100. In this case, no, but if the userhashes are randomly distributed
    #   (which they should be), then 1% of the final four digits should be between 0000 and 0099 and the other 99% should
    #   be from 0100 to 9999. This can be adjusted to e.g., 50% by checking whether the digits are less than 5000.
    
    # After all the pageviews associated with the sampled users are collected, we order them by timestamp and
    # separate them into 1 or more sessions, where a session is defined as >1 hour between consecutive pageviews.
    # Userhashes are then reassigned so it's not easy to determine whether a given user had multiple sessions in
    # a day or not.
    
    # Timestamps in the final dataset are reported only as year, month, day, hour. To preserve the research value
    # of knowing time between pageviews while still reducing the privacy risk that fine-grained timestamps pose,
    # we also compute the number of minutes (integer) between each pageview and indicate the order of pageviews.
    
    query = """
    WITH wikipedia_projects AS (
        SELECT DISTINCT hostname,
               dbname
          FROM wmf_raw.mediawiki_project_namespace_map
         WHERE snapshot = '2020-01'
               AND hostname LIKE '%wikipedia%'
        ),
    exploded_wikidata_links AS (
        SELECT wiki_db,
               page_id,
               FIRST_VALUE(item_id, true) OVER (PARTITION BY wiki_db, page_id ORDER BY snapshot DESC) as item_id
          FROM wmf.wikidata_item_page_link
         WHERE snapshot >= '2020-01-06'
               AND page_namespace = 0
        ),
    wikidata_ids AS (
        SELECT DISTINCT SUBSTR(p.hostname, 0, length(p.hostname)-4) AS project,
               wd.page_id AS page_id,
               wd.item_id AS item_id
          FROM exploded_wikidata_links wd
         INNER JOIN wikipedia_projects p
               ON (wd.wiki_db = p.dbname)
        ),
    blacklisted_countries AS (
        SELECT DISTINCT country
          FROM wmf.geoeditors_blacklist_country
         WHERE country NOT IN ('Russia', 'Iran')
        ),
    users_to_keep AS (
        SELECT sha2(CONCAT(user_agent, client_ip, '{4}'), 256) AS user,
               w.geocoded_data['country'] AS country,
               MAX(CAST(COALESCE(cvd.page_id, -1) > 0 AS int)) AS is_covid,
               COUNT(1) AS num_pviews
          FROM wmf.webrequest w
          LEFT JOIN covid_pages cvd
               ON (w.page_id = cvd.page_id AND pageview_info['project'] = cvd.project)
          LEFT ANTI JOIN blacklisted_countries bc
               ON (w.geocoded_data['country'] = country)
         WHERE webrequest_source = 'text'
               AND normalized_host.project_family = 'wikipedia'
               AND year = {1} AND month = {2} AND day = {3}
               AND agent_type = 'user'
               AND access_method <> 'mobile app'
               AND ((is_pageview AND (namespace_id = 0 OR namespace_id = 1))
                    OR (uri_query LIKE '%action=edit%' OR uri_query LIKE '%action=visualeditor%' OR uri_query LIKE '%&intestactions=edit&intestactionsdetail=full&uiprop=options%'))
             GROUP BY sha2(CONCAT(user_agent, client_ip, '{4}'), 256), w.geocoded_data['country']
            HAVING COUNT(1) < 500
                   AND MAX(CAST(NOT is_pageview as int)) = 0
                   AND ((is_covid > 0 AND CONV(SUBSTR(user, 49), 16, 10) / 18446744073709551615 < 0.5)
                        OR CONV(SUBSTR(user, 49), 16, 10) / 18446744073709551615 < 0.01)
        ),
    countries_to_drop AS (
           SELECT DISTINCT(country)
             FROM (
               SELECT country
                 FROM users_to_keep
                WHERE is_covid = 0
                GROUP BY country
               HAVING COUNT(DISTINCT(user)) < 500
                UNION ALL
               SELECT country
                 FROM users_to_keep
                GROUP BY country
               HAVING SUM(num_pviews * is_covid) > (0.90 * SUM(num_pviews))
                  ) c
        ),
    covid_to_drop AS (
           SELECT country
             FROM users_to_keep
            WHERE is_covid = 1
            GROUP BY country
           HAVING COUNT(DISTINCT(user)) < 500
        ),
       sessions AS (
           SELECT sha2(CONCAT(user_agent, client_ip, '{4}'), 256) AS user,
                  geocoded_data['continent'] AS continent,
                  geocoded_data['country'] AS country,
                  geocoded_data['subdivision'] AS subdivision,
                  geocoded_data['timezone'] AS timezone,
                  ts, hour,
                  pageview_info['project'] as project,
                  w.namespace_id as namespace_id,
                  COALESCE(wd.item_id, 'None') as qid,
                  w.page_id AS page_id,
                  pageview_info['page_title'] AS title,
                  CAST(COALESCE(cvd.page_id, -1) > 0 AS int) > 0 AS is_covid,
                  PARSE_URL(referer, 'HOST') as referer,
                  referer_class,
                  access_method,
                  COALESCE(x_analytics_map['WMF-Last-Access'], 'NULL') AS last_access,
                  CASE
                    WHEN (unix_timestamp(ts) - COALESCE(LAG(unix_timestamp(ts)) OVER (PARTITION BY sha2(CONCAT(user_agent, client_ip, '{4}'), 256) ORDER BY ts), 0)) >= {6}
                      THEN 1 ELSE 0
                  END AS new_session
             FROM wmf.webrequest w
            INNER JOIN users_to_keep u
                  ON (u.user = sha2(CONCAT(user_agent, client_ip, '{4}'), 256))
             LEFT ANTI JOIN countries_to_drop c
                  ON (c.country = w.geocoded_data['country'])
             LEFT ANTI JOIN covid_to_drop d
                  ON (d.country = w.geocoded_data['country']
                      AND u.is_covid = 1
                      AND CONV(SUBSTR(sha2(CONCAT(user_agent, client_ip, '{4}'), 256), 49), 16, 10) / 18446744073709551615 >= 0.01)
             LEFT JOIN covid_pages cvd
                  ON (w.pageview_info['project'] = cvd.project AND w.page_id = cvd.page_id)
             LEFT JOIN wikidata_ids wd
                  ON (w.pageview_info['project'] = wd.project AND w.page_id = wd.page_id)
            WHERE webrequest_source = 'text'
                  AND normalized_host.project_family = 'wikipedia'
                  AND user_agent IS NOT NULL AND client_ip IS NOT NULL
                  AND year = {1} AND month = {2} AND day = {3}
                  AND agent_type = 'user'
                  AND access_method <> 'mobile app'
                  AND is_pageview
                  AND (namespace_id = 0 OR namespace_id = 1)
        ),
      anonymized_sessions AS (
          SELECT sha2(CONCAT(user, '{5}', SUM(new_session) OVER (PARTITION BY user ORDER BY ts)), 256) AS session_hash,
                 continent, country, subdivision, timezone,
                 ts, hour,
                 project, qid, namespace_id, page_id, title, is_covid,
                 referer, referer_class, access_method, last_access
            FROM sessions s
        )         
       INSERT OVERWRITE TABLE {0}
       PARTITION(year={1}, month={2}, day={3})
       SELECT session_hash,
              continent, country, subdivision, timezone,
              hour,
              project, namespace_id, qid, page_id, title, is_covid,
              referer, referer_class, access_method, last_access,
              ROUND(COALESCE(unix_timestamp(ts) - LAG(unix_timestamp(ts)) OVER w, 0) / 60) as min_btw_pvs,
              ROW_NUMBER() OVER w as session_sequence
         FROM anonymized_sessions
       WINDOW w AS (PARTITION BY session_hash ORDER BY ts)
     """.format(table_name, query_date.year, query_date.month, query_date.day,
                salt_one, salt_two, session_length, ip_match)    
        
    if print_for_hive:
        print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
    else:
        print(re.sub(r'SUBSTR\(ip, -1, 1\) = [0-9]',
                     "SUBSTR(ip, -1, 1) = <#>",
                     re.sub(r"CONCAT\(user_agent, client_ip, '[a-zA-Z0-9]+'\)",
                            "CONCAT(user_agent, client_ip, '<SALT-1>')",
                            re.sub(r"CONCAT\(user, '[a-zA-Z0-9]+', SUM\(",
                                   "CONCAT(user, '<SALT-2>', SUM(",
                                   query))))
    
    if do_execute:
        spark.sql(query)

# clear salt and ip_match so not accidentally retained
ip_match = None
salt = None


    WITH wikipedia_projects AS (
        SELECT DISTINCT hostname,
               dbname
          FROM wmf_raw.mediawiki_project_namespace_map
         WHERE snapshot = '2020-01'
               AND hostname LIKE '%wikipedia%'
        ),
    exploded_wikidata_links AS (
        SELECT wiki_db,
               page_id,
               FIRST_VALUE(item_id, true) OVER (PARTITION BY wiki_db, page_id ORDER BY snapshot DESC) as item_id
          FROM wmf.wikidata_item_page_link
         WHERE snapshot >= '2020-01-06'
               AND page_namespace = 0
        ),
    wikidata_ids AS (
        SELECT DISTINCT SUBSTR(p.hostname, 0, length(p.hostname)-4) AS project,
               wd.page_id AS page_id,
               wd.item_id AS item_id
          FROM exploded_wikidata_links wd
         INNER JOIN wikipedia_projects p
               ON (wd.wiki_db = p.dbname)
        ),
    blacklisted_countries AS (
        SELECT DISTINCT country
          FROM wmf.geoeditors_blacklist_country
         WH

In [None]:
year = 2020
month = 1
day = 1
spark.sql("""
    SELECT country,
           SUM(is_covid) as covid_sessions,
           COUNT(1) - SUM(is_covid) as non_covid_sessions,
           SUM(num_pviews * is_covid) as covid_pviews,
           SUM(num_pviews) - SUM(num_pviews * is_covid) as noncovid_pviews
      FROM (SELECT session_hash,
                   country,
                   CAST(MAX(is_covid) as int) as is_covid,
                   COUNT(1) as num_pviews
              FROM {0}
             WHERE year = {1} and month = {2} and day = {3}
             GROUP BY session_hash, country
           ) s
     GROUP BY country
     ORDER BY covid_pviews DESC
     LIMIT 1000
     """.format(table_name, year, month, day)).show(300)

In [None]:
# if need to push the data to a TSV for further processing (not currently used)
write_to_tsv = False
if write_to_tsv:
    for i in range(num_days):
        query_date = dt + timedelta(days=i)

        day_data = spark.sql('SELECT * FROM {0} WHERE year = {1} and month = {2} and day = {3}'.format(
            table_name, query_date.year, query_date.month, query_date.day))
        day_data.show(10)
        day_data.coalesce(1).write.csv(path="/user/isaacj/covid19_tsvs/wr_{0}".format(t_date), compression="gzip", header=True, sep="\t")

## Generate intermediate data for a given country to inspect
NOTE: this is for testing purposes only

In [None]:
print_for_hive = False
do_execute = False
country = 'Ghana'

# salt for UA/IP hash
salt = ''.join(choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(randint(8,16)))

# matching by last digit of IP is ~random means of sampling devices (no obvious skew)
# Due to IPv6, IP addresses can actually end in [0-9] or [a-f]
# There are still many more IPv4 addresses than IPv6 though, so using a [0-9] gives you ~5x more data than a letter
# North America has a slightly higher proportion of IPv6 so they are undersampled slightly through this method
# consider adding to innermost where clause: AND SUBSTR(ip, -1, 1) = {4}
ip_match = randint(0, 9)
        
# query for a given day
query_date = dt
    
query = """
    SELECT sha2(CONCAT(user_agent, client_ip, '{0}'), 256) AS user,
           MAX(CAST(COALESCE(cvd.page_id, -1) > 0 AS int)) AS is_covid,
           MAX(CAST(NOT is_pageview as int)) AS editor,
           COUNT(1) AS num_pviews,
           COUNT(1) < 500 AS power_user,
           CONV(sha2(SUBSTR(CONCAT(user_agent, client_ip, '{0}'), 256), 49), 16, 10) / 18446744073709551615 as rand_sample
      FROM wmf.webrequest w
      LEFT JOIN covid_pages cvd
           ON (w.page_id = cvd.page_id AND pageview_info['project'] = cvd.project)
     WHERE webrequest_source = 'text'
           AND normalized_host.project_family = 'wikipedia'
           AND geocoded_data['country'] = '{5}'
           AND year = {1} AND month = {2} AND day = {3}
           AND agent_type = 'user'
           AND access_method <> 'mobile app'
           AND ((is_pageview AND (namespace_id = 0 OR namespace_id = 1))
                OR (uri_query LIKE '%action=edit%' OR uri_query LIKE '%action=visualeditor%' OR uri_query LIKE '%&intestactions=edit&intestactionsdetail=full&uiprop=options%'))
         GROUP BY sha2(CONCAT(user_agent, client_ip, '{0}'), 256)
 """.format(salt, query_date.year, query_date.month, query_date.day, ip_match, country)    

table = 'isaacj.{0}_sessions_{1}'.format(country, datetime.strftime(query_date, "%Y%m%d"))
query = 'CREATE TABLE {0} AS {1}'.format(table, query)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(re.sub(r'SUBSTR\(ip, -1, 1\) = [0-9]', "SUBSTR(ip, -1, 1) = <#>",
                 re.sub(r"CONCAT\(user_agent, client_ip, '[a-zA-Z0-9]+'\)", '<UH CLAUSE>', query)))

if do_execute:
    spark.sql(query)

# clear salt and ip_match so not accidentally retained
ip_match = None
salt = None