# Newcomer Task Equity Analysis
The goal of this analysis is to understand the impact of the Newcomer Tasks module -- i.e. a Wikimedia Foundation recommender system -- from the perspective of content equity. In particular, I examine the distribution of Wikipedia articles improved from the perspective of gender equity and geographic equity. In both cases, I ask the question whether the Newcomer Tasks system maintained the status quo (generally biases towards articles about men and the United States or language-relevant countries) or led to contributions to a more balanced distribution of articles.

I look at Newcomer Tasks specifically for a few reasons:
* It's relatively straightforward
* It has good data -- i.e. both well-logged and sufficient usage to draw meaningful conclusions
* And most importantly, it allows editors to select topics of interest to them so I can investigate how this self-selection affects content equity.

NOTE: major thanks to Morten Warncke-Wang who provided indispensible guidance and example code -- e.g., https://github.com/wikimedia-research/2021-Growth-structured-tasks/blob/main/T277355-leading-indicators.ipynb

## Setup

In [1]:
from collections import defaultdict
import os
import re

import pandas as pd

import wmfdata

In [2]:
spark = wmfdata.spark.get_session(app_name='pyspark regular; newcomereval',
                                  type='yarn-regular', # local, yarn-regular, yarn-large
                                  )  

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


## Parameters

In [3]:
print("Mediawiki partitions:")
spark.sql("SHOW PARTITIONS wmf_raw.mediawiki_project_namespace_map").show(50, False)

print("\nWikidata partitions:")
spark.sql("SHOW PARTITIONS wmf.wikidata_item_page_link").show(50, False)

Mediawiki partitions:
+------------------------+
|partition               |
+------------------------+
|snapshot=2016-12_private|
|snapshot=2017-07_private|
|snapshot=2021-03        |
|snapshot=2021-04        |
|snapshot=2021-05        |
|snapshot=2021-06        |
|snapshot=2021-07        |
|snapshot=2021-08        |
+------------------------+


Wikidata partitions:
+-------------------+
|partition          |
+-------------------+
|snapshot=2021-08-16|
|snapshot=2021-08-23|
|snapshot=2021-08-30|
|snapshot=2021-09-06|
|snapshot=2021-09-13|
|snapshot=2021-09-20|
+-------------------+



In [4]:
# Important parameters
mediawiki_snapshot = '2021-08'
wiki_dbs = ('viwiki','trwiki','ukwiki','svwiki','srwiki','ruwiki','ptwiki','kowiki','plwiki','hywiki','huwiki','hewiki','frwiki','fawiki','euwiki','cswiki','arwiki')
wikidata_snapshot = '2021-09-06'
newcomer_subset_tablename = 'isaacj.newcomer_tasks'
# bug where >20% of edits were missing (especially those accepted w/o bias) so to avoid the uncertainty and bias
# that that would insert into our results, we restrict to just stable (and mostly complete) Newcomer Tasks data
# https://github.com/nettrom/Growth-homepage-2019/blob/master/T266610_missing_tag_impact.ipynb
# start_date shouldn't be set to prior to '2020-10-28'
start_date = '2021-07-01'  # reduce amount of data so pipeline doesn't fail
known_users_tablename = 'known_users'
gen_table = 'isaacj.gender_wikidata'
geo_table = 'isaacj.qid_to_country_2021_08_02'


In [5]:
print("Gender data example:")
spark.sql(f'SELECT * FROM {gen_table} LIMIT 5').show(50, False)

print("\nGeography data example:")
spark.sql(f'SELECT * FROM {geo_table} LIMIT 5').show(50, False)

Gender data example:
+----------+--------+
|item_id   |gender  |
+----------+--------+
|Q100066   |Q6581072|
|Q100137735|Q6581097|
|Q100146397|Q6581097|
|Q100152085|Q6581097|
|Q100166821|Q6581097|
+----------+--------+


Geography data example:
+---------+--------+-------+
|qid      |property|country|
+---------+--------+-------+
|Q7068891 |P625    |Tuvalu |
|Q34967204|P625    |Tuvalu |
|Q11762732|P625    |Tuvalu |
|Q15256609|P625    |Tuvalu |
|Q3394461 |P625    |Tuvalu |
+---------+--------+-------+



In [6]:
def qid_to_gender_category(qid):
    """Map individual Wikidata gender values to a few more categories so long-tail more likely to be represented."""
    # male, male organism, eunuch, cisgender male
    if qid in ('Q6581097', 'Q44148', 'Q179294', 'Q15145778'):
        return 'male'
    # female, female organism, cisgender female
    elif qid in ('Q6581072', 'Q43445', 'Q15145779'):
        return 'female'
    # transgender male, transmasculine
    elif qid in ('Q2449503', 'Q27679766'):
        return 'transgender male'
    # transgender female, transfeminine
    elif qid in ('Q1052281', 'Q27679684'):
        return 'transgender female'
    # contains identities like non-binary, transgender person, two-spirit, genderfluid, etc.
    # See for more details: https://www.wikidata.org/wiki/Property_talk:P21
    else:
        return 'non-binary'
    
spark.udf.register('gender_lbl', qid_to_gender_category, 'String')

<function __main__.qid_to_gender_category(qid)>

### Gather Test Users
Certain users are known to be test accounts and should be excluded. Most shouldn't show up anyways based on dates selected but better safe than sorry :)

In [7]:
## Lists of known users to ignore (e.g. test accounts and experienced users)
known_users = defaultdict(set)
known_users['cswiki'].update([14, 127629, 303170, 342147, 349875, 44133, 100304, 307410, 439792, 444907,
                              454862, 456272, 454003, 454846, 92295, 387915, 398470, 416764, 44751, 132801,
                              137787, 138342, 268033, 275298, 317739, 320225, 328302, 339583, 341191,
                              357559, 392634, 398626, 404765, 420805, 429109, 443890, 448195, 448438,
                              453220, 453628, 453645, 453662, 453663, 453664, 440694, 427497, 272273,
                              458025, 458487, 458049, 59563, 118067, 188859, 191908, 314640, 390445,
                              451069, 459434, 460802, 460885, 79895, 448735, 453176, 467557, 467745,
                              468502, 468583, 468603, 474052, 475184, 475185, 475187, 475188, 294174,
                              402906, 298011])

known_users['kowiki'].update([303170, 342147, 349875, 189097, 362732, 384066, 416362, 38759, 495265,
                              515553, 537326, 566963, 567409, 416360, 414929, 470932, 472019, 485036,
                              532123, 558423, 571587, 575553, 576758, 360703, 561281, 595100, 595105,
                              595610, 596025, 596651, 596652, 596653, 596654, 596655, 596993, 942,
                              13810, 536529])

known_users['viwiki'].update([451842, 628512, 628513, 680081, 680083, 680084, 680085, 680086, 355424,
                              387563, 443216, 682713, 659235, 700934, 705406, 707272, 707303, 707681, 585762])

known_users['arwiki'].update([237660, 272774, 775023, 1175449, 1186377, 1506091, 1515147, 1538902,
                              1568858, 1681813, 1683215, 1699418, 1699419, 1699425, 1740419, 1759328, 1763990])

## Grab the user IDs of known test accounts so they can be added to the exclusion list

def get_known_users(wiki):
    '''
    Get user IDs of known test accounts and return a set of them.
    '''
    
    username_patterns = ["MMiller", "Zilant", "Roan", "KHarlan", "MWang", "SBtest",
                         "Cloud", "Rho2019", "Test"]

    known_user_query = '''
    SELECT user_id
    FROM user
    WHERE user_name LIKE "{name_pattern}%"
    '''
    
    known_users = set()
    
    for u_pattern in username_patterns:
        new_known = wmfdata.mariadb.run(known_user_query.format(
            name_pattern = u_pattern), wiki)
        known_users = known_users | set(new_known['user_id'])

    return(known_users)
        
for wiki in wiki_dbs:
    known_users[wiki] = known_users[wiki] | get_known_users(wiki)
    print(wiki, len(known_users[wiki]))

viwiki 239
trwiki 195
ukwiki 182
svwiki 264
srwiki 70
ruwiki 690
ptwiki 553
kowiki 361
plwiki 339
hywiki 27
huwiki 102
hewiki 241
frwiki 1144
fawiki 172
euwiki 41
cswiki 351
arwiki 287


In [8]:
data = []
for wiki in known_users:
    for uid in known_users[wiki]:
        data.append((wiki, uid))
df = pd.DataFrame(data, columns=['wiki_db', 'user_id'])
spark.createDataFrame(df).createOrReplaceTempView(known_users_tablename)
spark.sql(f"SELECT * FROM {known_users_tablename} LIMIT 10").show(60, False)

+-------+-------+
|wiki_db|user_id|
+-------+-------+
|cswiki |460802 |
|cswiki |494595 |
|cswiki |515077 |
|cswiki |88069  |
|cswiki |338952 |
|cswiki |211978 |
|cswiki |453643 |
|cswiki |453644 |
|cswiki |453645 |
|cswiki |14     |
+-------+-------+



## Newcomer Edits
Gather data on newcomer edits as identified by the `newcomer task` edit tag

In [9]:
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {newcomer_subset_tablename} (
        wiki_db                         STRING        COMMENT 'Wiki -- e.g., enwiki for English Wikipedia',
        page_id                         BIGINT        COMMENT 'Wikidata page ID',
        page_title                      STRING        COMMENT 'Page title (QID)',
        qid                             STRING        COMMENT 'Wikidata item ID',
        user_id                         BIGINT        COMMENT 'User ID; -1 if anonymous',
        user_text                       STRING        COMMENT 'User text; IP address if anonymous',
        revision_id                     BIGINT        COMMENT 'Revision ID',
        parent_rev_id                   BIGINT        COMMENT 'Parent revision ID',
        revision_timestamp              TIMESTAMP     COMMENT 'Revision timestamp (UTC)',
        revision_is_identity_reverted   BOOLEAN       COMMENT 'Was this revision reverted via identity revert?',
        gender                          STRING        COMMENT 'Gender value if exists and is human (as QID)',
        regions                         ARRAY<STRING> COMMENT 'List of regions associated with items'
    )
"""

print(create_table_query)
spark.sql(create_table_query)


    CREATE TABLE IF NOT EXISTS isaacj.newcomer_tasks (
        wiki_db                         STRING        COMMENT 'Wiki -- e.g., enwiki for English Wikipedia',
        page_id                         BIGINT        COMMENT 'Wikidata page ID',
        page_title                      STRING        COMMENT 'Page title (QID)',
        qid                             STRING        COMMENT 'Wikidata item ID',
        user_id                         BIGINT        COMMENT 'User ID; -1 if anonymous',
        user_text                       STRING        COMMENT 'User text; IP address if anonymous',
        revision_id                     BIGINT        COMMENT 'Revision ID',
        parent_rev_id                   BIGINT        COMMENT 'Parent revision ID',
        revision_timestamp              TIMESTAMP     COMMENT 'Revision timestamp (UTC)',
        revision_is_identity_reverted   BOOLEAN       COMMENT 'Was this revision reverted via identity revert?',
        gender                      

DataFrame[]

In [10]:
# Populate table with all edits made in these wikis that originated from Newcomer Tasks (excluding known test users)
#
# CTE explanations:
# * ne_edits: gather all newcomer task edits from mediawiki history snapshot (edit tag is always in English)
# * pid_to_qid: build map of wiki+page ID -> Wikidata ID for joining in equity components
# * regions: build map of Wikidata ID -> any associated regions (this data has been precomputed)
# * newcomer_edits_with_qid: join in Wikidata IDs to edit data and remove known test users from data
# * newcomer_edits_with_equity_facets: join in region + gender data
# * OVERWRITE: fill in time

print_for_hive = False
do_execute = True

query = f"""
WITH ne_edits AS (
  SELECT
    wiki_db,
    page_id,
    page_title,
    event_user_id AS user_id,
    REPLACE(event_user_text, ' ', '_') AS user_text,
    revision_id,
    revision_parent_id,
    CAST(event_timestamp AS TIMESTAMP) as revision_timestamp,
    revision_is_identity_reverted
  FROM wmf.mediawiki_history
  WHERE
    snapshot = '{mediawiki_snapshot}'
    AND wiki_db IN {wiki_dbs}
    AND page_namespace = 0
    AND NOT event_user_is_anonymous
    AND NOT SIZE(event_user_is_bot_by) > 0
    AND event_type = 'create'
    AND event_entity = 'revision'
    AND CAST(event_timestamp AS DATE) > '{start_date}'
    AND ARRAY_CONTAINS(revision_tags, 'newcomer task')
),
pid_to_qid AS (
    SELECT
      wiki_db,
      page_id,
      item_id
    FROM wmf.wikidata_item_page_link
    WHERE
      snapshot = '{wikidata_snapshot}'
      AND wiki_db in {wiki_dbs}
      AND page_namespace = 0
),
regions AS (
  SELECT
    qid AS qid,
    COLLECT_SET(country) AS regions
  FROM {geo_table} g
  INNER JOIN pid_to_qid p
    ON (g.qid = p.item_id)
  GROUP BY
    qid
),
newcomer_edits_with_qid AS (
  SELECT
    ne.*,
    pq.item_id AS qid
  FROM ne_edits ne
  LEFT ANTI JOIN {known_users_tablename} ku
    ON (ne.wiki_db = ku.wiki_db
        AND ne.user_id = ku.user_id)
  LEFT JOIN pid_to_qid pq
    ON (ne.wiki_db = pq.wiki_db
        AND ne.page_id = pq.page_id)
),
newcomer_edits_with_equity_facets AS (
  SELECT
    ne.*,
    g.gender AS gender,
    r.regions AS regions
  FROM newcomer_edits_with_qid ne
  LEFT JOIN {gen_table} g
    ON (ne.qid = g.item_id)
  LEFT JOIN regions r
    ON (ne.qid = r.qid)
)
INSERT OVERWRITE TABLE {newcomer_subset_tablename}
  SELECT
    wiki_db,
    page_id,
    page_title,
    qid,
    user_id,
    user_text,
    revision_id,
    revision_parent_id,
    revision_timestamp,
    revision_is_identity_reverted,
    gender,
    regions
  FROM newcomer_edits_with_equity_facets
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)


WITH ne_edits AS (
  SELECT
    wiki_db,
    page_id,
    page_title,
    event_user_id AS user_id,
    REPLACE(event_user_text, ' ', '_') AS user_text,
    revision_id,
    revision_parent_id,
    CAST(event_timestamp AS TIMESTAMP) as revision_timestamp,
    revision_is_identity_reverted
  FROM wmf.mediawiki_history
  WHERE
    snapshot = '2021-08'
    AND wiki_db IN ('viwiki', 'trwiki', 'ukwiki', 'svwiki', 'srwiki', 'ruwiki', 'ptwiki', 'kowiki', 'plwiki', 'hywiki', 'huwiki', 'hewiki', 'frwiki', 'fawiki', 'euwiki', 'cswiki', 'arwiki')
    AND page_namespace = 0
    AND NOT event_user_is_anonymous
    AND NOT SIZE(event_user_is_bot_by) > 0
    AND event_type = 'create'
    AND event_entity = 'revision'
    AND CAST(event_timestamp AS DATE) > '2021-07-01'
    AND ARRAY_CONTAINS(revision_tags, 'newcomer task')
),
pid_to_qid AS (
    SELECT
      wiki_db,
      page_id,
      item_id
    FROM wmf.wikidata_item_page_link
    WHERE
      snapshot = '2021-09-06'
      AND wiki_db in ('viwik

### Descriptive stats

In [11]:
# basic summary stats for edits for all wikis
# NOTE: num_rows should equal num_edits but occasionally a row is duplicated when joining in gender info
print_for_hive = False
do_execute = True

query = f"""
SELECT
  wiki_db,
  COUNT(1) AS num_rows,
  COUNT(DISTINCT(revision_id)) AS num_edits,
  COUNT(DISTINCT(page_id)) AS num_pages,
  COUNT(DISTINCT(user_id)) AS num_users,
  SUM(IF(gender IS NOT NULL, 1, 0)) AS edits_to_bios,
  SUM(IF(regions IS NOT NULL, 1, 0)) AS edits_to_geos,
  SUM(IF(revision_is_identity_reverted, 1, 0)) / COUNT(DISTINCT(revision_id)) AS pct_reverted
FROM {newcomer_subset_tablename}
GROUP BY
  wiki_db
ORDER BY
  num_edits DESC
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    spark.sql(query).show(50, False)


SELECT
  wiki_db,
  COUNT(1) AS num_rows,
  COUNT(DISTINCT(revision_id)) AS num_edits,
  COUNT(DISTINCT(page_id)) AS num_pages,
  COUNT(DISTINCT(user_id)) AS num_users,
  SUM(IF(gender IS NOT NULL, 1, 0)) AS edits_to_bios,
  SUM(IF(regions IS NOT NULL, 1, 0)) AS edits_to_geos,
  SUM(IF(revision_is_identity_reverted, 1, 0)) / COUNT(DISTINCT(revision_id)) AS pct_reverted
FROM isaacj.newcomer_tasks
GROUP BY
  wiki_db
ORDER BY
  num_edits DESC

+-------+--------+---------+---------+---------+-------------+-------------+--------------------+
|wiki_db|num_rows|num_edits|num_pages|num_users|edits_to_bios|edits_to_geos|pct_reverted        |
+-------+--------+---------+---------+---------+-------------+-------------+--------------------+
|fawiki |5763    |5763     |4908     |617      |1039         |2775         |0.06819364914107236 |
|ruwiki |4667    |4666     |3491     |747      |1190         |2580         |0.04414916416630947 |
|frwiki |3752    |3751     |3388     |545      |1066         |20

## Newcomer Clicks + Impressions
This data fills out the newcomer task funnel (impression -> click -> edit) and also allows us to inspect the impact of topic filtering. It is spread out across three tables:
* https://meta.wikimedia.org/wiki/Schema:NewcomerTask (`event.newcomertask`)
* https://meta.wikimedia.org/wiki/Schema:HelpPanel (`event.helppanel`)
* https://meta.wikimedia.org/wiki/Schema:HomepageModule (`event.homepagemodule`)

In [12]:
# Important parameters
events_subset_tablename = 'isaacj.newcomer_tasks_events'
# dates to match what was extracted from edit data
start_year = 2021
start_month = 7
start_day = 1
end_year = 2021
end_month = 9
end_day = 5
# these tables are updated in separate scripts but largely are extraction of data from Wikidata
gen_table = 'isaacj.gender_wikidata'
geo_table = 'isaacj.qid_to_country_2021_08_02'

In [13]:
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {events_subset_tablename} (
        wiki_db                         STRING        COMMENT 'Wiki -- e.g., enwiki for English Wikipedia',
        page_id                         BIGINT        COMMENT 'Wikidata page ID',
        page_title                      STRING        COMMENT 'Page title (QID)',
        parent_rev_id                   BIGINT        COMMENT 'Revision ID of article (when seen/clicked)',
        qid                             STRING        COMMENT 'Wikidata item ID',
        user_id                         BIGINT        COMMENT 'User ID',
        interaction_type                STRING        COMMENT 'impression or click',
        task_type                       STRING        COMMENT 'Type of structured task -- e.g., add-a-link',
        task_topic                      STRING        COMMENT 'Most relevant user-selected topic to recommendation',
        gender                          STRING        COMMENT 'Gender value if exists and is human (as QID)',
        regions                         ARRAY<STRING> COMMENT 'List of regions associated with items',
        year                            INT           COMMENT 'year',
        month                           INT           COMMENT 'month',
        day                             INT           COMMENT 'day'
    )
"""

print(create_table_query)
spark.sql(create_table_query)


    CREATE TABLE IF NOT EXISTS isaacj.newcomer_tasks_events (
        wiki_db                         STRING        COMMENT 'Wiki -- e.g., enwiki for English Wikipedia',
        page_id                         BIGINT        COMMENT 'Wikidata page ID',
        page_title                      STRING        COMMENT 'Page title (QID)',
        parent_rev_id                   BIGINT        COMMENT 'Revision ID of article (when seen/clicked)',
        qid                             STRING        COMMENT 'Wikidata item ID',
        user_id                         BIGINT        COMMENT 'User ID',
        interaction_type                STRING        COMMENT 'impression or click',
        task_type                       STRING        COMMENT 'Type of structured task -- e.g., add-a-link',
        task_topic                      STRING        COMMENT 'Most relevant user-selected topic to recommendation',
        gender                          STRING        COMMENT 'Gender value if exists and i

DataFrame[]

In [14]:
# Populate table with EventLogging data on impressions + clicks to recommendation cards.
#
# Explanation of CTEs:
# * newcomer_page_info: gather info on articles+topics with cards that were seen/clicked on. Distinct because I don't really care about duplicate clicks and it can be very noisy.
# * newcomer_action_info: gather info on users that interacted (saw/clicked) with newcomer task cards. This is split across the homepagemodule (homepage module) and helppanel (post-edit module) eventlogging. Distinct because I don't really care about duplicate views and it can be very noisy.
#   * NOTE: the different tables will have different interaction_types but functionally we can treat anything %impression as an impression and %click as a click.
# * newcomer_info: join page info with user info for complete data
# * newcomers_without_known_users: remove known users
# * pid_to_qid: get mapping of article page IDs -> Wikidata IDs to join in equity components
# * regions: gather relevant geographic info
# * newcomer_info_with_qid: add Wikidata IDs to impression/click data
# * newcomer_info_with_equity_facets: join in gender/geographic info
# * INSERT: load into table for further processing

print_for_hive = False
do_execute = True

query = f"""
WITH newcomer_page_info AS (
    SELECT DISTINCT
      event.newcomer_task_token AS newcomer_task_token,
      wiki as wiki_db,
      event.page_id AS page_id,
      event.page_title AS page_title,
      event.revision_id AS parent_rev_id,
      event.task_type AS task_type,
      COALESCE(event.topic, 'no-topic') AS task_topic,
      year,
      month,
      day
    FROM event.newcomertask
    WHERE
      (year = {start_year} AND ((month > {start_month} OR (month = {start_month} and day >= {start_day}))))
      AND (year = {end_year} AND ((month < {end_month} OR (month = {end_month} and day <= {end_day}))))
      AND wiki in {wiki_dbs}
),
newcomer_action_info AS (
    SELECT DISTINCT
      newcomer_task_token,
      user_id,
      interaction_type
    FROM
    (
        SELECT
          str_to_map(event.action_data, ';', '=')['newcomerTaskToken'] AS newcomer_task_token,
          event.user_id AS user_id,
          event.action AS interaction_type
        FROM event.homepagemodule
        WHERE
          (year = {start_year} AND ((month > {start_month} OR (month = {start_month} and day >= {start_day}))))
          AND (year = {end_year} AND ((month < {end_month} OR (month = {end_month} and day <= {end_day}))))
          AND wiki in {wiki_dbs}
          AND (event.action = 'se-task-click' OR event.action = 'se-task-impression')
        UNION ALL
        SELECT
          str_to_map(event.action_data, ';', '=')['newcomerTaskToken'] AS newcomer_task_token,
          event.user_id AS user_id,
          event.action AS interaction_type
        FROM event.helppanel
        WHERE
          (year = {start_year} AND ((month > {start_month} OR (month = {start_month} and day >= {start_day}))))
          AND (year = {end_year} AND ((month < {end_month} OR (month = {end_month} and day <= {end_day}))))
          AND wiki in {wiki_dbs}
          AND (event.action = 'postedit-task-click' OR event.action = 'postedit-impression')
    ) a
    WHERE
      newcomer_task_token IS NOT NULL
),
newcomer_info AS (
    SELECT
      wiki_db,
      page_id,
      page_title,
      parent_rev_id,
      user_id,
      interaction_type,
      task_type,
      task_topic,
      year,
      month,
      day
    FROM newcomer_page_info npi
    INNER JOIN newcomer_action_info nai
      ON (npi.newcomer_task_token = nai.newcomer_task_token)
),
newcomers_without_known_users AS (
    SELECT
      *
    FROM newcomer_info ni
    LEFT ANTI JOIN known_users ku
    ON (ni.wiki_db = ku.wiki_db
        AND ni.user_id = ku.user_id)
),
pid_to_qid AS (
    SELECT
      wiki_db,
      page_id,
      item_id
    FROM wmf.wikidata_item_page_link
    WHERE
      snapshot = '{wikidata_snapshot}'
      AND wiki_db in {wiki_dbs}
      AND page_namespace = 0
),
regions AS (
  SELECT
    qid AS qid,
    COLLECT_SET(country) AS regions
  FROM {geo_table} g
  INNER JOIN pid_to_qid p
    ON (g.qid = p.item_id)
  GROUP BY
    qid
),
newcomer_info_with_qid AS (
  SELECT
    ni.*,
    pq.item_id AS qid
  FROM newcomer_info ni
  LEFT JOIN pid_to_qid pq
    ON (ni.wiki_db = pq.wiki_db
        AND ni.page_id = pq.page_id)
),
newcomer_info_with_equity_facets AS (
  SELECT
    ni.*,
    g.gender AS gender,
    r.regions AS regions
  FROM newcomer_info_with_qid ni
  LEFT JOIN {gen_table} g
    ON (ni.qid = g.item_id)
  LEFT JOIN regions r
    ON (ni.qid = r.qid)
)
INSERT OVERWRITE TABLE {events_subset_tablename}
  SELECT
    wiki_db,
    page_id,
    page_title,
    parent_rev_id,
    qid,
    user_id,
    interaction_type,
    task_type,
    task_topic,
    gender,
    regions,
    year,
    month,
    day
  FROM newcomer_info_with_equity_facets
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)


WITH newcomer_page_info AS (
    SELECT DISTINCT
      event.newcomer_task_token AS newcomer_task_token,
      wiki as wiki_db,
      event.page_id AS page_id,
      event.page_title AS page_title,
      event.revision_id AS parent_rev_id,
      event.task_type AS task_type,
      COALESCE(event.topic, 'no-topic') AS task_topic,
      year,
      month,
      day
    FROM event.newcomertask
    WHERE
      (year = 2021 AND ((month > 7 OR (month = 7 and day >= 1))))
      AND (year = 2021 AND ((month < 9 OR (month = 9 and day <= 5))))
      AND wiki in ('viwiki', 'trwiki', 'ukwiki', 'svwiki', 'srwiki', 'ruwiki', 'ptwiki', 'kowiki', 'plwiki', 'hywiki', 'huwiki', 'hewiki', 'frwiki', 'fawiki', 'euwiki', 'cswiki', 'arwiki')
),
newcomer_action_info AS (
    SELECT DISTINCT
      newcomer_task_token,
      user_id,
      interaction_type
    FROM
    (
        SELECT
          str_to_map(event.action_data, ';', '=')['newcomerTaskToken'] AS newcomer_task_token,
          event.user_id AS user

### Descriptive stats

In [15]:
# basic summary stats for all wikis
print_for_hive = False
do_execute = True

query = f"""
SELECT
  wiki_db,
  interaction_type,
  COUNT(1) AS num_rows,
  COUNT(DISTINCT(user_id)) AS num_users,
  COUNT(DISTINCT(page_id)) AS num_pages,
  SUM(IF(gender IS NOT NULL, 1, 0)) AS events_to_bios,
  SUM(IF(regions IS NOT NULL, 1, 0)) AS events_to_geos,
  SUM(IF(task_topic ='no-topic', 0, 1)) AS events_with_topics
FROM {events_subset_tablename}
GROUP BY
  wiki_db,
  interaction_type
ORDER BY
  wiki_db,
  interaction_type
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    spark.sql(query).show(100, False)


SELECT
  wiki_db,
  interaction_type,
  COUNT(1) AS num_rows,
  COUNT(DISTINCT(user_id)) AS num_users,
  COUNT(DISTINCT(page_id)) AS num_pages,
  SUM(IF(gender IS NOT NULL, 1, 0)) AS events_to_bios,
  SUM(IF(regions IS NOT NULL, 1, 0)) AS events_to_geos,
  SUM(IF(task_topic ='no-topic', 0, 1)) AS events_with_topics
FROM isaacj.newcomer_tasks_events
GROUP BY
  wiki_db,
  interaction_type
ORDER BY
  wiki_db,
  interaction_type

+-------+-------------------+--------+---------+---------+--------------+--------------+------------------+
|wiki_db|interaction_type   |num_rows|num_users|num_pages|events_to_bios|events_to_geos|events_with_topics|
+-------+-------------------+--------+---------+---------+--------------+--------------+------------------+
|arwiki |postedit-impression|3888    |629      |3596     |1074          |2088          |2999              |
|arwiki |postedit-task-click|1711    |203      |1643     |506           |918           |1418              |
|arwiki |se-task-click      |

## Analysis

In [16]:
# NOTE: this is >1000 pages edited (above) and no major power users (data from ad-hoc analyses section)
wikis_with_stable_data = ['fawiki', 'ruwiki', 'frwiki', 'arwiki']

# Some of the data on power contributors used to make above determination
"""
+----------------+-------+-------+----------+--------------------+
|interaction_type|wiki_db|user_id|num_events|pct_events          |
+----------------+-------+-------+----------+--------------------+
|1-impression    |euwiki |< del >|542       |0.633177570093458   |
|1-impression    |huwiki |< del >|5933      |0.3558874692579929  |
|1-impression    |srwiki |< del >|1145      |0.2643120960295475  |
|1-impression    |hywiki |< del >|1090      |0.2342574683000215  |
|1-impression    |hywiki |< del >|997       |0.21427036320653342 |
|1-impression    |plwiki |< del >|6379      |0.15513132295719845 |
|1-impression    |svwiki |< del >|1061      |0.14789517702815724 |
|1-impression    |euwiki |< del >|124       |0.14485981308411214 |
|1-impression    |cswiki |< del >|2621      |0.12736903489163184 |
|1-impression    |ukwiki |< del >|1962      |0.11102308736985061 |
+----------------+-------+-------+----------+--------------------+

+----------------+-------+-------+----------+--------------------+
|interaction_type|wiki_db|user_id|num_events|pct_events          |
+----------------+-------+-------+----------+--------------------+
|2-click         |huwiki |< del >|1452      |0.7082926829268292  |
|2-click         |euwiki |< del >|27        |0.5510204081632653  |
|2-click         |plwiki |< del >|1051      |0.3651841556636553  |
|2-click         |euwiki |< del >|12        |0.24489795918367346 |
|2-click         |cswiki |< del >|363       |0.2061328790459966  |
|2-click         |hywiki |< del >|23        |0.18699186991869918 |
|2-click         |ukwiki |< del >|123       |0.13977272727272727 |
|2-click         |svwiki |< del >|49        |0.13764044943820225 |
|2-click         |kowiki |< del >|126       |0.1362162162162162  |
|2-click         |hywiki |< del >|15        |0.12195121951219512 |
|2-click         |viwiki |< del >|407       |0.12152881457151389 |
|2-click         |kowiki |< del >|104       |0.11243243243243244 |
|2-click         |cswiki |< del >|183       |0.10391822827938671 |
+----------------+-------+-------+----------+--------------------+

+----------------+-------+-------+----------+--------------------+
|interaction_type|wiki_db|user_id|num_events|pct_events          |
+----------------+-------+-------+----------+--------------------+
|3-edit          |huwiki |< del >|1320      |0.7942238267148014  |
|3-edit          |euwiki |< del >|9         |0.6                 |
|3-edit          |hywiki |< del >|14        |0.2978723404255319  |
|3-edit          |cswiki |< del >|210       |0.28532608695652173 |
|3-edit          |ukwiki |< del >|277       |0.2484304932735426  |
|3-edit          |plwiki |< del >|416       |0.22894881673087508 |
|3-edit          |kowiki |< del >|96        |0.17454545454545456 |
|3-edit          |svwiki |< del >|30        |0.17341040462427745 |
|3-edit          |srwiki |< del >|9         |0.17307692307692307 |
|3-edit          |hywiki |< del >|8         |0.1702127659574468  |
|3-edit          |ukwiki |< del >|188       |0.16860986547085202 |
|3-edit          |plwiki |< del >|304       |0.16730875068794718 |
|3-edit          |cswiki |< del >|117       |0.15896739130434784 |
|3-edit          |kowiki |< del >|84        |0.15272727272727274 |
|3-edit          |hywiki |< del >|7         |0.14893617021276595 |
|3-edit          |kowiki |< del >|77        |0.14                |
|3-edit          |srwiki |< del >|7         |0.1346153846153846  |
|3-edit          |euwiki |< del >|2         |0.13333333333333333 |
|3-edit          |euwiki |< del >|2         |0.13333333333333333 |
|3-edit          |euwiki |< del >|2         |0.13333333333333333 |
|3-edit          |viwiki |< del >|276       |0.12939521800281295 |
|3-edit          |svwiki |< del >|22        |0.12716763005780346 |
|3-edit          |ptwiki |< del >|220       |0.12304250559284116 |
|3-edit          |ukwiki |< del >|131       |0.11748878923766816 |
|3-edit          |viwiki |< del >|244       |0.11439287388654477 |
|3-edit          |cswiki |< del >|80        |0.10869565217391304 |
|3-edit          |svwiki |< del >|18        |0.10404624277456648 |
|3-edit          |ukwiki |< del >|114       |0.10224215246636771 |
+----------------+-------+-------+----------+--------------------+
"""
print(wikis_with_stable_data)

['fawiki', 'ruwiki', 'frwiki', 'arwiki']


In [17]:
# gender equity for edits by language
#
# CTEs:
# * qids: gather all QIDs with a Wikipedia article in a given language
# * baseline: get baseline gender distribution for the language by joining qids with gender data (articles without gender ignored)
# * baseline_pct: convert counts into percentages
# * ne_edits: get edit data stratified by gender from newcomer tasks
# * SELECT: join together edit gender data and baseline gender data (some filtering to reduce size of output)
# 
# NOTE: baseline is generally ~80% male, ~20% female, ~1% non-binary / transgender but does vary a bit by wiki

print_for_hive = False
do_execute = True

for wikidb in wikis_with_stable_data:
    print(f"\n== Analyzing {wikidb} ==")
    query = f"""
    with qids AS (
        SELECT
          item_id
        FROM wmf.wikidata_item_page_link
        WHERE
          snapshot = '{wikidata_snapshot}'
          AND wiki_db = '{wikidb}'
          AND page_namespace = 0
    ),
    baseline AS (
        SELECT
          gender_lbl(gender) AS gender_cat,
          COUNT(1) AS num_bios
        FROM {gen_table} g
        INNER JOIN qids q
          ON (g.item_id = q.item_id)
        WHERE
          gender IS NOT NULL
        GROUP BY
          gender_cat
    ),
    baseline_pct AS (
        SELECT
          gender_cat,
          num_bios / (SUM(num_bios) OVER ()) AS pct_bios
        FROM baseline
    ),
    edits_per_user AS (
        SELECT
          page_id,
          user_id,
          gender_lbl(gender) AS gender_cat,
          '3-edit' AS interaction_type,
          NULL as task_topic,
          CONCAT(YEAR(revision_timestamp),"-",MONTH(revision_timestamp),"-",DAY(revision_timestamp)) AS date
        FROM {newcomer_subset_tablename}
        WHERE
          wiki_db = '{wikidb}'
          AND gender IS NOT NULL
    ),
    actions_per_user AS (
        SELECT
          page_id,
          user_id,
          gender_lbl(gender) AS gender_cat,
          CASE
            WHEN interaction_type LIKE '%impression' THEN '1-impression'
            WHEN interaction_type LIKE '%click' THEN '2-click' 
            ELSE '0-unexpected'
          END AS interaction_type,
          CASE task_topic WHEN 'no-topic' THEN 'no-topic' ELSE 'topic' END AS task_topic,
          CONCAT(year, "-", month, "-", day) AS date
        FROM {events_subset_tablename}
        WHERE
          wiki_db = '{wikidb}'
          AND gender IS NOT NULL
    ),
    imputed_topics AS (
        SELECT
          page_id,
          user_id,
          gender_cat,
          interaction_type,
          LAST_VALUE(task_topic, TRUE) OVER w AS task_topic,
          date
        FROM (
            SELECT
              *
            FROM actions_per_user
            UNION ALL
            SELECT
              *
            FROM edits_per_user
        ) all
        WINDOW w AS (PARTITION BY page_id, user_id ORDER BY date ASC, interaction_type ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)    
    ),
    imputed_topics_counts AS (
        SELECT
          interaction_type,
          task_topic,
          gender_cat,
          COUNT(DISTINCT(user_id)) AS num_users,
          COUNT(1) AS num_events
        FROM imputed_topics
        WHERE
          task_topic IS NOT NULL
        GROUP BY
          interaction_type,
          task_topic,
          gender_cat
    )
    SELECT
      interaction_type,
      task_topic AS topic_filter,
      b.gender_cat AS gender,
      num_users,
      num_events,
      ROUND(num_events / SUM(num_events) OVER (PARTITION BY interaction_type, task_topic), 3) AS pct_events,
      ROUND(pct_bios, 3) AS pct_baseline
    FROM imputed_topics_counts i
    LEFT JOIN baseline_pct b
      ON (i.gender_cat = b.gender_cat)
    ORDER BY
      interaction_type,
      task_topic,
      num_events DESC
    """

    if do_execute:
        result = spark.sql(query)
        result.show(500, False)


== Analyzing fawiki ==
+----------------+------------+------------------+---------+----------+----------+------------+
|interaction_type|topic_filter|gender            |num_users|num_events|pct_events|pct_baseline|
+----------------+------------+------------------+---------+----------+----------+------------+
|1-impression    |no-topic    |male              |2205     |8693      |0.837     |0.788       |
|1-impression    |no-topic    |female            |754      |1685      |0.162     |0.21        |
|1-impression    |no-topic    |transgender female|2        |2         |0.0       |0.001       |
|1-impression    |no-topic    |non-binary        |1        |1         |0.0       |0.0         |
|1-impression    |topic       |male              |888      |8555      |0.79      |0.788       |
|1-impression    |topic       |female            |464      |2265      |0.209     |0.21        |
|1-impression    |topic       |non-binary        |4        |5         |0.0       |0.0         |
|1-impression   

In [18]:
# geographic breakdown of edits by wiki
# 
# CTEs:
# * qids: gather all QIDs with a Wikipedia article in a given language
# * baseline: get count of articles for each country on wiki (articles with no countries ignored)
# * baseline_pct: convert counts into percentages
#    * NOTE: a single article can have many associated countries but the percentages are still normalized to add to 100%
#    *  so if e.g., 20% of geographic articles are attributed to UK here, it's maybe 30% of articles actually associated with the UK
# * regions_of_contrib: get edit data stratified by country from newcomer tasks
#    * If an article that was edited is associated with e.g., 3 countries, it'll show up 3 times, once with each country
# * ne_regions: compute counts stratified by geography
# * ne_regions_props: convert counts to percentages
# * SELECT: join together edit geo data and baseline geo data
# 
# NOTE: geographic baseline varies greatly by language though United States is usually top-3
# 
# NOTE: if desired, could use regions, continents, global north/south as geographic aggregations too
#   * Would just need to join against isaacj.country_to_region

print_for_hive = False
do_execute = True

for wikidb in wikis_with_stable_data:
    print(f"\n== Analyzing {wikidb} ==")
    query = f"""
    with qids AS (
        SELECT
          item_id
        FROM wmf.wikidata_item_page_link
        WHERE
          snapshot = '{wikidata_snapshot}'
          AND wiki_db = '{wikidb}'
          AND page_namespace = 0
    ),
    baseline AS (
        SELECT
          country,
          COUNT(DISTINCT(qid)) AS num_articles
        FROM {geo_table} g
        INNER JOIN qids q
          ON (g.qid = q.item_id)
        GROUP BY
          country
    ),
    baseline_pct AS (
        SELECT
          country,
          num_articles / (SUM(num_articles) OVER ()) AS pct_articles
        FROM baseline
    ),
    edits_per_user AS (
        SELECT
          page_id,
          user_id,
          EXPLODE(regions) AS country,
          '3-edit' AS interaction_type,
          NULL as task_topic,
          CONCAT(YEAR(revision_timestamp),"-",MONTH(revision_timestamp),"-",DAY(revision_timestamp)) AS date
        FROM {newcomer_subset_tablename}
        WHERE
          wiki_db = '{wikidb}'
          AND regions IS NOT NULL
          AND SIZE(regions) > 0
    ),
    actions_per_user AS (
        SELECT
          page_id,
          user_id,
          EXPLODE(regions) AS country,
          CASE
            WHEN interaction_type LIKE '%impression' THEN '1-impression'
            WHEN interaction_type LIKE '%click' THEN '2-click' 
            ELSE '0-unexpected'
          END AS interaction_type,
          CASE task_topic WHEN 'no-topic' THEN 'no-topic' ELSE 'topic' END AS task_topic,
          CONCAT(year, "-", month, "-", day) AS date
        FROM {events_subset_tablename}
        WHERE
          wiki_db = '{wikidb}'
          AND regions IS NOT NULL
          AND SIZE(regions) > 0
    ),
    imputed_topics AS (
        SELECT
          page_id,
          user_id,
          country,
          interaction_type,
          LAST_VALUE(task_topic, TRUE) OVER w AS task_topic,
          date
        FROM (
            SELECT
              *
            FROM actions_per_user
            UNION ALL
            SELECT
              *
            FROM edits_per_user
        ) all
        WINDOW w AS (PARTITION BY page_id, user_id ORDER BY date ASC, interaction_type ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)    
    ),
    imputed_topics_counts AS (
        SELECT
          interaction_type,
          task_topic,
          country,
          COUNT(DISTINCT(user_id)) AS num_users,
          COUNT(1) AS num_events
        FROM imputed_topics
        WHERE
          task_topic IS NOT NULL
        GROUP BY
          interaction_type,
          task_topic,
          country
    )
    SELECT
      interaction_type,
      task_topic AS topic_filter,
      i.country AS country,
      num_users,
      num_events,
      ROUND(num_events / SUM(num_events) OVER (PARTITION BY interaction_type, task_topic), 3) AS pct_events,
      ROUND(pct_articles, 3) AS pct_baseline
    FROM imputed_topics_counts i
    LEFT JOIN baseline_pct b
      ON (i.country = b.country)
    WHERE
      (pct_articles >= 0.01 OR num_events > 25)
    ORDER BY
      interaction_type,
      task_topic,
      num_events DESC
    """

    if do_execute:
        result = spark.sql(query)
        result.show(500, False)


== Analyzing fawiki ==
+----------------+------------+------------------------------+---------+----------+----------+------------+
|interaction_type|topic_filter|country                       |num_users|num_events|pct_events|pct_baseline|
+----------------+------------+------------------------------+---------+----------+----------+------------+
|1-impression    |no-topic    |Iran                          |1553     |4939      |0.237     |0.208       |
|1-impression    |no-topic    |United States of America      |1301     |3944      |0.189     |0.193       |
|1-impression    |no-topic    |United Kingdom                |694      |1449      |0.07      |0.052       |
|1-impression    |no-topic    |France                        |437      |772       |0.037     |0.036       |
|1-impression    |no-topic    |Japan                         |364      |725       |0.035     |0.028       |
|1-impression    |no-topic    |Germany                       |332      |592       |0.028     |0.044       |
|1-i

## Ad-hoc Analyses

### EventLogging loss

In [15]:
# How well can we impute topic for edits based on clicks/impressions?
# Some small percentage of edits lack eventlogging (presumably ad-blockers mainly)
# Hopefully this isn't too high -- e..g, much above 10% -- or biased (though hard to measure that)

# NOTE: the complicated window function below does the following:
#   * it orders all the events between a given user + article chronologically (with impressions then clicks then edits when a date is shared)
#   * for each topic row, it starts with that rows value and begins walking towards the start of the list. The first non-null value is retained.
#   * So if there's already a topic, it's kept. If there isn't -- i.e. for edit data -- it takes the most recent topic for that user+article (from the click/impression data)
print_for_hive = False
do_execute = True

query = f"""
with edits_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      '3-edit' AS interaction_type,
      NULL as task_topic,
      CONCAT(YEAR(revision_timestamp),"-",MONTH(revision_timestamp),"-",DAY(revision_timestamp)) AS date
    FROM {newcomer_subset_tablename}
),
actions_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      CASE
        WHEN interaction_type LIKE '%impression' THEN '1-impression'
        WHEN interaction_type LIKE '%click' THEN '2-click' 
        ELSE '0-unexpected'
      END AS interaction_type,
      task_topic,
      CONCAT(year, "-", month, "-", day) AS date
    FROM {events_subset_tablename}
),
imputed_topics AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      interaction_type,
      LAST_VALUE(task_topic, TRUE) OVER w AS task_topic,
      date
    FROM (
        SELECT
          *
        FROM actions_per_user
        UNION ALL
        SELECT
          *
        FROM edits_per_user
    ) all
    WINDOW w AS (PARTITION BY wiki_db, page_id, user_id ORDER BY date ASC, interaction_type ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)    
)
SELECT
  wiki_db,
  interaction_type,
  COUNT(1) AS num_rows,
  SUM(IF(ISNULL(task_topic), 1, 0)) AS num_nulls,
  ROUND(SUM(IF(ISNULL(task_topic), 1, 0)) / COUNT(1), 3) AS pct_null
FROM imputed_topics
GROUP BY
  wiki_db,
  interaction_type
ORDER BY
  wiki_db,
  interaction_type
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)
    result.show(500, False)


with edits_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      '3-edit' AS interaction_type,
      NULL as task_topic,
      CONCAT(YEAR(revision_timestamp),"-",MONTH(revision_timestamp),"-",DAY(revision_timestamp)) AS date
    FROM isaacj.newcomer_tasks
),
actions_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      CASE
        WHEN interaction_type LIKE '%impression' THEN '1-impression'
        WHEN interaction_type LIKE '%click' THEN '2-click' 
        ELSE '0-unexpected'
      END AS interaction_type,
      task_topic,
      CONCAT(year, "-", month, "-", day) AS date
    FROM isaacj.newcomer_tasks_events
),
imputed_topics AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      interaction_type,
      LAST_VALUE(task_topic, TRUE) OVER w AS task_topic,
      date
    FROM (
        SELECT
          *
        FROM actions_per_user
        UNION ALL
        SELECT
          *
        FROM edits_per_user
    ) all
    WINDOW w 

### Relative Distribution of Each Task Type

In [35]:
# Breakdown of edits by types (imputed)
# Similar logic to above EventLogging audit
print_for_hive = False
do_execute = True

query = f"""
with edits_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      '3-edit' AS interaction_type,
      NULL as task_type,
      CONCAT(YEAR(revision_timestamp),"-",MONTH(revision_timestamp),"-",DAY(revision_timestamp)) AS date
    FROM {newcomer_subset_tablename}
),
actions_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      CASE
        WHEN interaction_type LIKE '%impression' THEN '1-impression'
        WHEN interaction_type LIKE '%click' THEN '2-click' 
        ELSE '0-unexpected'
      END AS interaction_type,
      task_type,
      CONCAT(year, "-", month, "-", day) AS date
    FROM {events_subset_tablename}
),
imputed_types AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      interaction_type,
      LAST_VALUE(task_type, TRUE) OVER w AS task_type,
      date
    FROM (
        SELECT
          *
        FROM actions_per_user
        UNION ALL
        SELECT
          *
        FROM edits_per_user
    ) all
    WINDOW w AS (PARTITION BY wiki_db, page_id, user_id ORDER BY date ASC, interaction_type ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)    
),
action_counts AS (
    SELECT
      wiki_db,
      interaction_type,
      task_type,
      COUNT(1) AS num_actions
    FROM imputed_types
    GROUP BY
      wiki_db,
      interaction_type,
      task_type
)
SELECT
  wiki_db,
  interaction_type,
  task_type,
  num_actions,
  ROUND(num_actions / SUM(num_actions) OVER (PARTITION BY wiki_db, interaction_type), 3) AS pct_actions
FROM action_counts
WHERE
  task_type IS NOT NULL
ORDER BY
  wiki_db,
  task_type,
  interaction_type
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)
    result.show(500, False)


with edits_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      '3-edit' AS interaction_type,
      NULL as task_type,
      CONCAT(YEAR(revision_timestamp),"-",MONTH(revision_timestamp),"-",DAY(revision_timestamp)) AS date
    FROM isaacj.newcomer_tasks
),
actions_per_user AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      CASE
        WHEN interaction_type LIKE '%impression' THEN '1-impression'
        WHEN interaction_type LIKE '%click' THEN '2-click' 
        ELSE '0-unexpected'
      END AS interaction_type,
      task_type,
      CONCAT(year, "-", month, "-", day) AS date
    FROM isaacj.newcomer_tasks_events
),
imputed_types AS (
    SELECT
      wiki_db,
      page_id,
      user_id,
      interaction_type,
      LAST_VALUE(task_type, TRUE) OVER w AS task_type,
      date
    FROM (
        SELECT
          *
        FROM actions_per_user
        UNION ALL
        SELECT
          *
        FROM edits_per_user
    ) all
    WINDOW w AS (P

### Power User Examination

In [None]:
# power analysis by wiki -- i.e. do we have users who contribute a lot of data (and so we should be suspicious of conclusions)

print_for_hive = False
do_execute = True

query = f"""
with actions AS (
    SELECT
      wiki_db,
      user_id,
      CASE
        WHEN interaction_type LIKE '%impression' THEN '1-impression'
        WHEN interaction_type LIKE '%click' THEN '2-click'
        ELSE interaction_type
      END AS interaction_type
    FROM {events_subset_tablename}
),
actions_per_user AS (
    SELECT
      wiki_db,
      user_id,
      interaction_type,
      COUNT(1) AS num_events
    FROM actions
    GROUP BY
      wiki_db,
      user_id,
      interaction_type
),
edits_per_user AS (
    SELECT
      wiki_db,
      user_id,
      COUNT(1) AS num_events
    FROM {newcomer_subset_tablename}
    GROUP BY
      wiki_db,
      user_id
)
SELECT
  interaction_type,
  wiki_db,
  user_id,
  num_events,
  pct_events
FROM (
    SELECT
      wiki_db,
      user_id,
      interaction_type,
      num_events,
      num_events / (SUM(num_events) OVER (PARTITION BY wiki_db, interaction_type)) AS pct_events
    FROM actions_per_user
    UNION ALL
    SELECT
      wiki_db,
      user_id,
      '3-edit' AS interaction_type,
      num_events,
      num_events / (SUM(num_events) OVER (PARTITION BY wiki_db)) AS pct_events
    FROM edits_per_user
) a
WHERE
  pct_events >= 0.02
ORDER BY
  interaction_type,
  pct_events DESC
LIMIT 500
"""

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)
    result.show(500, False)
    
# Output:
# +----------------+-------------+----------+--------------------+
# |interaction_type|user_id      |num_events|pct_events          |
# +----------------+-------------+----------+--------------------+
# |2-click         |<  deleted  >|1452      |0.03243460584805772 |
# |2-click         |<  deleted  >|1051      |0.023477114839055555|
# |3-edit          |<  deleted  >|1320      |0.04619584237418632 |
# |3-edit          |<  deleted  >|573       |0.02005319521243088 |
# +----------------+-------------+----------+--------------------+