In [2]:
# This notebook stores a list of unillustrated articles with suggested images in hdfs. See https://phabricator.wikimedia.org/T299789

import re
import math
import os
import os.path
from os import path
from wmfdata.spark import get_session
import subprocess

In [3]:
# Pass in the full snapshot date
snapshot = '2022-01-24'
reg = r'^([\w]+-[\w]+)'
short_snapshot = re.match(reg, snapshot).group()

In [4]:
# We use wmfdata boilerplate to init a spark session.
# Under the hood the library uses findspark to initialise
# Spark's environment. pyspark imports will be available 
# after initialisation
spark = get_session(type='regular', app_name="T299789")
import pyspark
import pyspark.sql

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [26]:
# load data stored by previous scripts

commons_file_pages = spark.read.parquet('hdfs:/user/mfossati/commons_file_pages')
commons_file_pages.createOrReplaceTempView('commons_file_pages')
commons_files_related_wikidata_items = spark.read.parquet('commons_files_related_wikidata_items')
commons_files_related_wikidata_items.createOrReplaceTempView('commons_files_related_wikidata_items')

In [28]:
# Gather suggestions based on p18, p373 and lead image (previously written to a parquet, which we parse here)
#
# Also gather suggestions based on depicts
# 
# Also determine a confidence score based on the source of the match, see https://phabricator.wikimedia.org/T301687


query="""WITH suggestion_qid_p18 AS
(
 SELECT cfp.page_title, EXPLODE(cw.reverse_p18) as wikidataId, 'istype-wikidata-image' as source, NULL as found_on 
 FROM commons_files_related_wikidata_items cw
 JOIN commons_file_pages cfp
 ON cfp.page_id=cw.page_id
 WHERE cw.reverse_p18 IS NOT NULL
),
suggestion_qid_p373 AS
(
 SELECT page_title, SPLIT(info,"\\\|")[0] as wikidataId, 'istype-wikidata-commons-category' as source, NULL as found_on
 FROM 
 (
     SELECT cfp.page_title as page_title, EXPLODE(cw.reverse_p373) as info
     FROM commons_files_related_wikidata_items cw
     JOIN commons_file_pages cfp
     ON cfp.page_id=cw.page_id
     WHERE cw.reverse_p373 IS NOT NULL
 )
),
suggestion_qid_leadImage AS
(
 SELECT page_title, 
 SPLIT(info,"\\\|")[0] as wikidataId,
 'istype-lead-image' as source, 
 collect_set(SPLIT(info,"\\\|")[1]) as found_on
 FROM 
 (
     SELECT cfp.page_title as page_title, EXPLODE(cw.container_page_qids) as info
     FROM commons_files_related_wikidata_items cw
     JOIN commons_file_pages cfp
     ON cfp.page_id=cw.page_id
     WHERE cw.container_page_qids IS NOT NULL
 )
 GROUP BY page_title, wikidataId
),
commons_statements AS 
( 
 SELECT id AS mId,
 EXPLODE(statements) AS statement 
 FROM structured_data.commons_entity WHERE snapshot='"""+snapshot+"""' 
),
suggestion_qid_commons AS
(
 SELECT DISTINCT from_json(statement.mainsnak.datavalue.value, 'entityType STRING, numericId INT, id STRING').id AS wikidataId,
 cfp.page_title,
 'istype-depicts' as source,
 NULL as found_on
 FROM commons_statements cs
 JOIN commons_file_pages cfp 
 ON cfp.page_id=SUBSTRING( cs.mId, 2 )
 WHERE statement.mainsnak.property IN ('P180', 'P6243', 'P921')
)
SELECT wikidataId, page_title AS suggestion, source, found_on, 90 as confidence FROM suggestion_qid_p18
UNION 
SELECT wikidataId, page_title AS suggestion, source, found_on, 80 as confidence FROM suggestion_qid_p373
UNION
SELECT wikidataId, page_title AS suggestion, source, found_on, 80 as confidence FROM suggestion_qid_leadImage
UNION
SELECT wikidataId, page_title AS suggestion, source, found_on, 70 as confidence FROM suggestion_qid_commons
"""
wdSuggestionsDF = spark.sql(query)
wdSuggestionsDF.createOrReplaceTempView("all_suggestions")

In [30]:
# articles corresponding to wikidata items that are instances of lists, or years, or names, etc should NOT be illustrated, so filter out suggestions associated with those items

unillustratable = [
    "Q577", # year
    "Q29964144", # calendar year
    "Q14795564", # recurrent timeframe
    "Q3311614", # century leap year
    "Q101352", # family name
    "Q82799", # name
    "Q4167410", # list
    "Q21199", # natural number
    "Q28920044", # positive integer
    "Q28920052", # non negative integer
]
query = """
SELECT wikidataId, suggestion, 
collect_set(source) AS sources, 
found_on, 
collect_set(from_json(claim.mainSnak.dataValue.value, 'entityType STRING, numericId INT, id STRING').id) as instance_of,
MAX(confidence) as confidence_score
FROM all_suggestions as
JOIN wmf.wikidata_entity we 
ON as.wikidataId=we.id
LATERAL VIEW OUTER explode(we.claims) c AS claim
WHERE we.typ='item'
AND claim.mainSnak.property='P31'
AND we.snapshot='"""+snapshot+"""'
AND from_json(claim.mainSnak.dataValue.value, 'entityType STRING, numericId INT, id STRING').id NOT IN ('""" + "','".join(unillustratable) + """')
GROUP BY wikidataId,suggestion,found_on
ORDER BY wikidataId,suggestion
"""
fsDF = spark.sql(query)
fsDF.createOrReplaceTempView("filtered_suggestions")



In [31]:
languages=['ruwiki', 'ptwiki']
for wiki in languages:
    # get all suggestions for unillustrated articles on the wiki
    query = """WITH illustrated_pages AS
    (
        SELECT DISTINCT il_from
        FROM wmf_raw.mediawiki_imagelinks il
        JOIN commons_file_pages cfp
        ON cfp.page_title=il.il_to
        WHERE il.wiki_db='"""+wiki+"""' 
        AND il.snapshot='"""+short_snapshot+"""' 
    )
    SELECT p.wiki_db,p.page_id,p.page_title,fs.suggestion,fs.sources,fs.found_on,fs.instance_of,fs.confidence_score,current_timestamp() as timestamp
    FROM wmf_raw.mediawiki_page p
    LEFT ANTI JOIN illustrated_pages il
    ON il.il_from=p.page_id
    JOIN wmf.wikidata_item_page_link wipl
    ON p.page_id=wipl.page_id
    JOIN filtered_suggestions fs
    ON wipl.item_id=fs.wikidataId
    WHERE p.page_namespace=0 
    AND page_is_redirect=0 
    AND p.wiki_db='"""+wiki+"""' 
    AND p.snapshot='"""+short_snapshot+"""'
    AND wipl.wiki_db='"""+wiki+"""'
    AND wipl.snapshot='"""+snapshot+"""'
    """
    suggestionsDF = spark.sql(query).cache()
    # NOTE: suggestions.all.<wiki> contains all the suggestions data that is needed by cassandra for https://phabricator.wikimedia.org/T299885 for an individual wiki, so hopefully 
    # all we'll need to do for that ticket is write the data to hdfs
    suggestionsDF.write.mode('overwrite').parquet('suggestions.all.' + wiki)
    suggestionsDF.createOrReplaceTempView("suggestions_for_wiki")
    
    # Get all files with a suggestion, and write to a file for import into the search index
    # 
    # NOTE: this is a SUPER hacky way of testing if we already have data for this wiki. 
    # @todo replace with fsspec
    proc = subprocess.Popen(['hadoop', 'fs', '-test', '-e', 'suggestions.weighted_tags.' + wiki ])
    proc.communicate()
    if proc.returncode != 0:
        # If we DO have previous data for this wiki load it up and use it to find suggestions that
        # are in the new list but not in the old, so the old ones can be deleted from the index
        #
        # NOTE: this does not work!!!
        #
        # Lazy-evaluation in spark makes things weird - once we've have told spark that we're doing an overwrite on a file it deletes it immediately
        # Will have to think of another way of doing this (maybe date-based?)
        pageIdsWithSuggestionsPrevious = spark.read.parquet('suggestions.weighted_tags.' + wiki)
        pageIdsWithSuggestionsPrevious.createOrReplaceTempView('previous_weighted_tags')
        query = """WITH writes as 
        (
         SELECT DISTINCT s.page_id,'""" + wiki + """' as wiki,'recommendation.image/' as tag, collect_set('exists|1') as values
         FROM suggestions_for_wiki as s
         GROUP BY s.page_id
        ),
        previous as 
        (
         SELECT DISTINCT p.page_id, EXPLODE(p.values) as previous_value
         FROM previous_weighted_tags p
         LEFT ANTI JOIN suggestions_for_wiki as s
         ON p.page_id=s.page_id
        )
        SELECT page_id,'""" + wiki + """' as wiki,'recommendation.image/' as tag, collect_set('__DELETE_GROUPING__') as values 
        FROM previous 
        WHERE previous_value!='__DELETE_GROUPING__' 
        GROUP BY page_id
        UNION
        SELECT * FROM writes
        """  
        pageIdsWithSuggestions = spark.sql(query)
        # NOTE: this should satisfy the requirements for https://phabricator.wikimedia.org/T299884
        # (or would if the part above actually worked)
        pageIdsWithSuggestions.write.mode('overwrite').parquet('suggestions.weighted_tags.' + wiki)
    else:
        query = """
        SELECT DISTINCT s.page_id,'""" + wiki + """' as wiki,'recommendation.image/' as tag, collect_set('exists|1') as values
        FROM suggestions_for_wiki as s
        GROUP BY s.page_id
        """
        pageIdsWithSuggestions = spark.sql(query)
        pageIdsWithSuggestions.write.mode('overwrite').parquet('suggestions.weighted_tags.' + wiki)
    