# Aggregation of Search Activity on Commons

Per [T258229](https://phabricator.wikimedia.org/T258229), we aggregate daily measurements of search activity on Commons. In this notebook, we gather statistics for both legacy search and Special:MediaSearch. Specifically, we gather data on the following six measurements:

* Number of search sessions
* Number of searches made
* Number of searches per session
* Search session length
* Click-through rate (to quickview and from quickview to file pages)
* Average position of clicked result in successful searches

In [20]:
import datetime as dt

import pandas as pd
import numpy as np

from wmfdata import spark, mariadb

## Configuring Timestamps

We'll call the day we're gathering data for `data_day`. We're also expecting this notebook to be run the day after, which we'll call `next_day`. In order to ignore search sessions that started on the previous day, we also define that day. Lastly, we set a limit of one hour after midnight UTC as the cutoff for data. In other words, we expect search sessions to be completed within one hour.

In [21]:
next_day = dt.datetime.now(dt.timezone.utc).date()

data_day = next_day - dt.timedelta(days = 1)
previous_day = data_day - dt.timedelta(days = 1)

limit_timestamp = dt.datetime.combine(next_day, dt.time(hour = 1))

## Aggregation Tables

We define a set of tables in the Data Lake for aggregation of results.

In [22]:
search_count_table_name = 'wmf_product.commons_search_counts'

In [5]:
create_search_count_query = '''
CREATE TABLE {table_name} (
    log_date DATE COMMENT "the date of the aggregated search counts",
    wiki STRING COMMENT "the project name of search counts",
    num_legacy_sessions BIGINT COMMENT "the number of legacy search sessions",
    num_autocomplete_searches BIGINT COMMENT "the number of autocomplete searches",
    num_fulltext_successful_searches BIGINT COMMENT "the number of fulltext searches with results",
    num_fulltext_zeroresult_searches BIGINT COMMENT "the number of fulltext searches with no results",
    median_autocomp_searches_per_session DOUBLE COMMENT "median number of autocomplete searches per sesssion",
    median_ft_success_searches_per_session DOUBLE COMMENT "median number of fulltext searches with results per session",
    median_ft_zero_searches_per_session DOUBLE COMMENT "median number of fulltext searches with no results per session",
    median_legacy_session_length DOUBLE COMMENT "median length of a search session, in seconds",
    num_mediasearch_sessions BIGINT COMMENT "the number of MediaSearch search sessions",
    num_mediasearch_searches BIGINT COMMENT "the number of searches made in MediaSearch sessions",
    median_mediasearch_searches_per_session DOUBLE COMMENT "median number of searches, per MediaSearch session",
    median_mediasearch_session_length DOUBLE COMMENT "median length of a MediaSearch session, in seconds"
)
'''

## Helper Functions

In [24]:
def make_partition_statement(start_ts, end_ts, prefix = ''):
    '''
    This takes the two timestamps and creates a statement that selects
    partitions based on `year`, `month`, and `day` in order to make our
    data gathering not use excessive amounts of data. It assumes that
    `start_ts` and `end_ts` are not more than a month apart, which should
    be a reasonable expectation for this notebook.
    
    An optional prefix can be set to enable selecting partitions for
    multiple tables with different aliases.
    
    :param start_ts: start timestamp
    :type start_ts: datetime.datetime
    
    :param end_ts: end timestamp
    :type end_ts: datetime.datetime
    
    :param prefix: prefix to use in front of partition clauses, "." is added automatically
    :type prefix: str
    '''
    
    if prefix:
        prefix = f'{prefix}.' # adds "." after the prefix
    
    # there are three cases:
    # 1: month and year are the same, output a "BETWEEN" statement with the days
    # 2: months differ, but the years are the same.
    # 3: years differ too.
    # Case #2 and #3 can be combined, because it doesn't really matter
    # if the years are the same in the month-selection or not.
    
    if start_ts.year == end_ts.year and start_ts.month == end_ts.month:
        return(f'''{prefix}year = {start_ts.year}
AND {prefix}month = {start_ts.month}
AND {prefix}day BETWEEN {start_ts.day} AND {end_ts.day}''')
    else:
        return(f'''
(
    ({prefix}year = {start_ts.year}
     AND {prefix}month = {start_ts.month}
     AND {prefix}day >= {start_ts.day})
 OR ({prefix}year = {end_ts.year}
     AND {prefix}month = {end_ts.month}
     AND {prefix}day <= {end_ts.day})
)''')

## Number of searches

Note: I did a pull of a week's worth of searches on Commons and found the difference between `meta.dt` and `client_dt` to generally be very small, typically within a minute. Instead of doing time math to ignore sessions with large drifts, we'll instead coalesce the two and ignore sessions that are outside our defined time limitations, which should be a very reasonable decision.

The query below limits sessions for legacy search to those having less than 50 searches in them. This is in order to focus on non-automated traffic and is an approach that's been used in search analysis in the past, for example when we gathered baseline metrics for legacy search in [T258723](https://phabricator.wikimedia.org/T258723).

Note that we calculate medians (using `percentile()`) as the "average number of searches made per session" because we've previously found that search activity has a long-tail distribution.

In [25]:
search_count_query = '''
WITH legacy_sessions AS ( -- all legacy search sessions started during the day of interest
    SELECT
        wiki,
        event.searchsessionid AS session_id,
        MIN(coalesce(client_dt, meta.dt)) AS session_start_dt
    FROM event.searchsatisfaction AS ess
    WHERE {ess_partition_statement}
    AND wiki = "commonswiki"
    AND useragent.is_bot = false
    AND event.subTest IS NULL
    AND event.action = "searchResultPage"
    AND event.isforced IS NULL -- only include non-test users
    GROUP BY wiki, event.searchsessionid
    HAVING TO_DATE(session_start_dt) = "{today}"
),
legacy_session_end AS ( -- timestamp of last event in a valid legacy search session
    SELECT
        ls.wiki,
        event.searchsessionid AS session_id,
        MAX(coalesce(client_dt, meta.dt)) AS session_end_dt
    FROM legacy_sessions AS ls
    INNER JOIN event.searchsatisfaction AS ess
    ON ls.session_id = ess.event.searchsessionid
    WHERE {ess_partition_statement}
    AND event.action != "checkin" -- don't count page checkins where a user is reading a page
    AND coalesce(client_dt, meta.dt) < "{limit_timestamp}" -- until end of our data window
    GROUP BY ls.wiki,event.searchsessionid
),
legacy_counts AS ( -- count of searches based on valid legacy search sessions
    SELECT
        ls.wiki,
        event.searchsessionid AS session_id,
        COUNT(DISTINCT
                IF(event.source = "autocomplete", event.searchsessionid, NULL),
                IF(event.source = "autocomplete", event.pageviewid, NULL)) AS num_autocomplete_searches,
        COUNT(IF(event.source = "fulltext"
            AND event.hitsReturned > 0 , 1, NULL)) AS num_fulltext_successful_searches,
        COUNT(IF(event.source = "fulltext"
            AND event.hitsReturned IS NULL , 1, NULL)) AS num_fulltext_zeroresult_searches
    FROM legacy_sessions AS ls
    INNER JOIN event.searchsatisfaction AS ess
    ON ls.session_id = ess.event.searchsessionid
    WHERE {ess_partition_statement}
    AND coalesce(client_dt, meta.dt) < "{limit_timestamp}" -- until end of our data window
    AND event.action = "searchResultPage"
    GROUP BY ls.wiki, event.searchsessionid
),
mediasearch_sessions AS ( -- all MediaSearch sessions started during the day of interest
    SELECT
        CASE WHEN normalized_host.project = 'commons' THEN 'commonswiki' ELSE CONCAT(normalized_host.project,normalized_host.project_class) END AS wiki,
        web_pageview_id AS session_id,
        MIN(coalesce(dt, meta.dt)) AS session_start_dt
    FROM event.mediawiki_mediasearch_interaction AS ms
    WHERE {ms_partition_statement}
    AND action = "search_new"
    AND normalized_host.project IN ('pt', 'commons')
    GROUP BY CASE WHEN normalized_host.project = 'commons' THEN 'commonswiki' ELSE CONCAT(normalized_host.project,normalized_host.project_class) END, web_pageview_id
    HAVING TO_DATE(session_start_dt) = "{today}"
),
mediasearch_session_end AS ( -- timestamp of last event in a valid Mediasearch session
    SELECT
        wiki,
        web_pageview_id AS session_id,
        MAX(coalesce(dt, meta.dt)) AS session_end_dt
    FROM mediasearch_sessions AS mess
    INNER JOIN event.mediawiki_mediasearch_interaction AS ms
    ON mess.session_id = ms.web_pageview_id
    WHERE {ms_partition_statement}
    AND coalesce(dt, meta.dt) < "{limit_timestamp}" -- until end of our data window
    GROUP BY wiki, web_pageview_id
),
mediasearch_counts AS ( -- count of searches based on valid MediaSearch sessions
    SELECT
        wiki,
        web_pageview_id AS session_id,
        COUNT(1) AS num_mediasearch_searches
    FROM mediasearch_sessions AS mess
    INNER JOIN event.mediawiki_mediasearch_interaction AS ms
    ON mess.session_id = ms.web_pageview_id
    WHERE {ms_partition_statement}
    AND coalesce(dt, meta.dt) < "{limit_timestamp}" -- until end of our data window
    AND action = "search_new"
    GROUP BY wiki, web_pageview_id
),
legacy_stats AS ( -- statistics for legacy search
    SELECT
        TO_DATE(session_start_dt) AS log_date,
        ls.wiki,
        COUNT(1) AS num_legacy_sessions,
        SUM(num_autocomplete_searches) AS num_autocomplete_searches,
        SUM(num_fulltext_successful_searches) AS num_fulltext_successful_searches,
        SUM(num_fulltext_zeroresult_searches) AS num_fulltext_zeroresult_searches,
        percentile(
            unix_timestamp(session_end_dt, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") -
                unix_timestamp(session_start_dt, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
            0.5 -- median is the 50th percentile
        ) AS median_legacy_session_length,
        percentile(
            num_autocomplete_searches,
            0.5 -- median is the 50th percentile
        ) AS median_autocomp_searches_per_session,
        percentile(
            num_fulltext_successful_searches,
            0.5 -- median is the 50th percentile
        ) AS median_ft_success_searches_per_session,
        percentile(
            num_fulltext_zeroresult_searches,
            0.5 -- median is the 50th percentile
        ) AS median_ft_zero_searches_per_session
    FROM legacy_sessions AS ls
    INNER JOIN legacy_session_end AS lse
    ON (ls.session_id = lse.session_id AND ls.wiki = lse.wiki)
    INNER JOIN legacy_counts AS lc
    ON (ls.session_id = lc.session_id AND ls.wiki = lc.wiki)
    -- cutoff of 50 searches per session to remove automated traffic
    WHERE (num_autocomplete_searches + num_fulltext_successful_searches) < 50
    GROUP BY TO_DATE(session_start_dt),ls.wiki
),
mediasearch_stats AS ( -- statistics for MediaSearch
    SELECT
        TO_DATE(session_start_dt) AS log_date,
        mess.wiki,
        COUNT(1) AS num_mediasearch_sessions,
        SUM(num_mediasearch_searches) AS num_mediasearch_searches,
        percentile(
            unix_timestamp(session_end_dt, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") -
                unix_timestamp(session_start_dt, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
            0.5 -- median is the 50th percentile
        ) AS median_mediasearch_session_length,
        percentile(
            num_mediasearch_searches,
            0.5 -- median is the 50th percentile
        ) AS median_mediasearch_searches_per_session
    FROM mediasearch_sessions AS mess
    INNER JOIN mediasearch_session_end AS mse
    ON (mess.session_id = mse.session_id AND mess.wiki = mse.wiki)
    INNER JOIN mediasearch_counts AS mc
    ON (mess.session_id = mc.session_id AND mess.wiki = mse.wiki)
    GROUP BY TO_DATE(session_start_dt),mess.wiki
)

INSERT INTO {aggregate_table}
SELECT
    ms.log_date,
    ms.wiki,
    coalesce(ls.num_legacy_sessions, 0) AS num_legacy_sessions,
    coalesce(ls.num_autocomplete_searches,0) AS num_autocomplete_searches,
    coalesce(ls.num_fulltext_successful_searches,0) AS num_fulltext_successful_searches,
    coalesce( ls.num_fulltext_zeroresult_searches,0) AS num_fulltext_zeroresult_searches,
    coalesce(ls.median_autocomp_searches_per_session,0) AS median_autocomp_searches_per_session,
    coalesce(ls.median_ft_success_searches_per_session,0) AS median_ft_success_searches_per_session,
    coalesce(ls.median_ft_zero_searches_per_session,0) AS median_ft_zero_searches_per_session,
    coalesce(ls.median_legacy_session_length,0) AS median_legacy_session_length,
    coalesce(ms.num_mediasearch_sessions, 0) AS num_mediasearch_sessions,
    coalesce(ms.num_mediasearch_searches, 0) AS num_mediasearch_searches,
    coalesce(ms.median_mediasearch_searches_per_session, 0.0) AS median_mediasearch_searches_per_session,
    coalesce(ms.median_mediasearch_session_length, 0.0) AS median_mediasearch_session_length
FROM mediasearch_stats AS ms
LEFT JOIN legacy_stats  AS ls
ON (ls.log_date = ms.log_date AND ls.wiki = ms.wiki)
'''

In [27]:
try:
    spark.run(search_count_query.format(
        today = data_day,
        limit_timestamp = limit_timestamp.isoformat(),
        ess_partition_statement = make_partition_statement(previous_day, next_day, prefix = 'ess'),
        ms_partition_statement = make_partition_statement(previous_day, next_day, prefix = 'ms'),
        aggregate_table = search_count_table_name
    ))
except UnboundLocalError:
    # wmfdata currently (late Feb 2021) has an issue with DDL/DML SQL queries,
    # and so we ignore that error
    pass
    

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


AnalysisException: '`wmf_product`.`commons_search_counts` requires that the data to be inserted have the same number of columns as the target table: target table has 13 column(s) but the inserted data has 14 column(s), including 0 partition column(s) having constant value(s).;'