# Search Events Data

[T301902](https://phabricator.wikimedia.org/T301902)

This notebook is for pulling reduced search event data from `searchsatisfaction` table.

We are interested in the following emerging languages for the search experimentations:

Priority 1:
Arabic, Bengali*, Spanish, Portuguese*, Russian

Priority 2: French*, Korean*, Indonesian, Ukrainian, Thai* ,Malaysian (?), Hindi, Tagalog, Afrikaans, Cantonese, Malayalam, Telugu

In [1]:
import datetime as dt
import pandas as pd
import numpy as np

from wmfdata import hive, spark

You are using wmfdata v1.3.1, but v1.3.3 is available.

To update, run `pip install --upgrade git+https://github.com/wikimedia/wmfdata-python.git@release --ignore-installed`.

To see the changes, refer to https://github.com/wikimedia/wmfdata-python/blob/release/CHANGELOG.md


## Notes

Timestamps: we'll coalesce `dt` and `meta.dt`, and trust whatever comes out of it. When I investigated timestamps for searches on Commons I found that there are peaks around the various hour intervals, but they're incredibly small compared to the correct timestamp. To begin with, it's easier to trust these timestamps than develop heuristics to change them.

Event logging in SearchSatisfaction is only done on the desktop platform. 
TO DO: We will work with web team to rebuild search database for mobile web searches. 

Users who have Do Not Track enabled are not part of the dataset.

In [2]:
wiki = "'ruwiki', 'arwiki', 'bnwiki', 'eswiki', 'ptwiki'"
wiki2 = "'frwiki', 'kowiki', 'idwiki', 'ukwiki', 'thwiki', 'mswiki', 'hiwiki', 'tlwiki', 'afwiki', 'zh-yuewiki', 'mlwiki', 'tewiki'"

## Configuring Timestamps

Configuring Timestamps
We'll call the day we're gathering data for `data_day`. We're also expecting this notebook to be run the day after, which we'll call `next_day`. In order to ignore search sessions that started on the previous day, we also define that day. Lastly, we set a limit of one hour after midnight UTC as the cutoff for data. In other words, we expect search sessions to be completed within one hour.

In [3]:
next_day = dt.datetime.now(dt.timezone.utc).date()

data_day = next_day - dt.timedelta(days = 1)
previous_day = data_day - dt.timedelta(days = 1)

limit_timestamp = dt.datetime.combine(next_day, dt.time(hour = 1))

## Create Aggregation Tables

In [4]:
event_table = 'cchen_search.search_events'

In [11]:
create_table_query = '''
CREATE TABLE IF NOT EXISTS {table_name} (
    dt STRING,
    wiki STRING,
    session_id STRING,
    unique_id STRING,
    action STRING,
    source STRING,
    input_location STRING,
    query STRING,
    results_returned BIGINT,
    click_position BIGINT,
    pageview_id STRING,
    user_is_bot BOOLEAN
)
'''

In [12]:
hive.run(create_table_query.format(
            table_name = event_table
))

## Timestamp Functions

In [7]:
def make_partition_statement(start_ts, end_ts, prefix = ''):
    '''
    This takes the two timestamps and creates a statement that selects
    partitions based on `year`, `month`, and `day` in order to make our
    data gathering not use excessive amounts of data. It assumes that
    `start_ts` and `end_ts` are not more than a month apart, which should
    be a reasonable expectation for this notebook.
    
    An optional prefix can be set to enable selecting partitions for
    multiple tables with different aliases.
    
    :param start_ts: start timestamp
    :type start_ts: datetime.datetime
    
    :param end_ts: end timestamp
    :type end_ts: datetime.datetime
    
    :param prefix: prefix to use in front of partition clauses, "." is added automatically
    :type prefix: str
    '''
    
    if prefix:
        prefix = f'{prefix}.' # adds "." after the prefix
    
    # there are three cases:
    # 1: month and year are the same, output a "BETWEEN" statement with the days
    # 2: months differ, but the years are the same.
    # 3: years differ too.
    # Case #2 and #3 can be combined, because it doesn't really matter
    # if the years are the same in the month-selection or not.
    
    if start_ts.year == end_ts.year and start_ts.month == end_ts.month:
        return(f'''{prefix}year = {start_ts.year}
AND {prefix}month = {start_ts.month}
AND {prefix}day BETWEEN {start_ts.day} AND {end_ts.day}''')
    else:
        return(f'''
(
    ({prefix}year = {start_ts.year}
     AND {prefix}month = {start_ts.month}
     AND {prefix}day >= {start_ts.day})
 OR ({prefix}year = {end_ts.year}
     AND {prefix}month = {end_ts.month}
     AND {prefix}day <= {end_ts.day})
)''')


## Get Event Data

In [8]:
event_query = ''' 
    
    INSERT INTO cchen_search.search_events
    
    SELECT 
        MIN(coalesce(client_dt, meta.dt)) AS dt, 
        wiki AS wiki_db,
        event.searchsessionid AS session_id,
        event.uniqueid AS unique_id,
        event.action AS action,
        event.source AS source, 
        event.inputlocation AS input_location,
        event.query AS query,
        event.hitsReturned AS results_returned,
        event.position AS click_position,
        event.pageviewid AS pageview_id,
        useragent.is_bot AS user_is_bot
    FROM event.searchsatisfaction ess
    WHERE
        {ess_partition_statement}
        AND wiki  in ({wiki_db})
        AND event.subTest IS NULL
        AND event.isforced IS NULL -- only include non-test users
    GROUP BY 
        wiki,
        event.searchsessionid ,
        event.uniqueid ,
        event.action ,
        event.source , 
        event.inputlocation ,
        event.query ,
        event.hitsReturned , 
        event.position,
        event.pageviewid,
        useragent.is_bot
    HAVING
        TO_DATE(dt) = '{today}'
        
'''

In [13]:
# set up days
first_day = dt.date(2022, 4, 1) 
last_day = dt.date(2022, 4, 11)

In [14]:
current_day = first_day

while current_day <= last_day:
    # calculate days
    next_day = current_day
    data_day = next_day - dt.timedelta(days = 1)
    previous_day = data_day - dt.timedelta(days = 1)

    limit_timestamp = dt.datetime.combine(next_day, dt.time(hour = 1))
    
    # print some helpful stuff
    print(f'running data gathering for {data_day} (simulating cron job on {current_day})')
    
    try:
        spark.run(event_query.format(
            today = data_day,
            limit_timestamp = limit_timestamp.isoformat(),
            ess_partition_statement = make_partition_statement(previous_day, next_day, prefix = 'ess'),
            wiki_db = wiki
        ))
    except UnboundLocalError:
        pass
    
    current_day += dt.timedelta(days = 1)

running data gathering for 2022-03-31 (simulating cron job on 2022-04-01)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-01 (simulating cron job on 2022-04-02)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-02 (simulating cron job on 2022-04-03)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-03 (simulating cron job on 2022-04-04)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-04 (simulating cron job on 2022-04-05)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-05 (simulating cron job on 2022-04-06)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-06 (simulating cron job on 2022-04-07)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-07 (simulating cron job on 2022-04-08)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-08 (simulating cron job on 2022-04-09)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-09 (simulating cron job on 2022-04-10)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                

running data gathering for 2022-04-10 (simulating cron job on 2022-04-11)


PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
                                                                                