# Preparing Wikidata Items for topic classification  
### based on https://github.com/geohci/wikidata-topic-model-api

This notebook make usage of the [WMF's Hadoop Cluster](https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster). If you don't have access to that cluster, you will need to rewrite the code using the [Wikidata Dump](https://dumps.wikimedia.org/wikidata). 

Here we use two tables from the [Wikimedia Data Lake](https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake):

* [wmf.wikidata_item_page_link](https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Edits/Wikidata_item_page_link): Containig the relation between Wikidata Items and Page Titles. This is results are equivalent to the 'sitelinks' value that you will find in the Wikidata Dump.

* [wmf.wikidata_entity](https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Edits/Wikidata_entity): From we exract the claims for each Wikidata Items. You will find equilivant information in the claims field of Wikidata dump. 

This code works is based on the  [wikidata-topic-model-api](https://github.com/geohci/wikidata-topic-model-api). If want to get the topic for sinlge (or small set of) Wikidata Item(s), we recommend you to use this experimental API: https://tools.wmflabs.org/wiki-topic/

In [2]:
#check partitions on  wikidata_item_page_link 
spark.sql(''' 
 show partitions wmf.wikidata_item_page_link 
 ''').show()

+-------------------+
|          partition|
+-------------------+
|snapshot=2020-01-06|
|snapshot=2020-01-13|
|snapshot=2020-01-20|
|snapshot=2020-01-27|
|snapshot=2020-02-03|
|snapshot=2020-02-10|
|snapshot=2020-02-17|
|snapshot=2020-02-24|
|snapshot=2020-03-02|
|snapshot=2020-03-23|
|snapshot=2020-03-30|
+-------------------+



In [5]:
#get all Wikidata Items with a page in at least one project

QwithPage = spark.sql('''SELECT 
item_id as wikidata_item
FROM wmf.wikidata_item_page_link  
WHERE snapshot="2020-03-30" AND page_namespace=0 GROUP BY wikidata_item
''')

In [3]:
# Extract direct claims for Wikidata Items on wikidata_entity. 
directClaims = spark.sql("""
SELECT
  subject,
  claim.mainSnak.property AS predicate,
  claim.mainSnak.dataValue.value AS object,
  -- needed to correctly interpret the object value
  claim.mainSnak.dataValue.typ AS object_typ
FROM (
  SELECT
    id as subject,
    explode(claims) as claim
  FROM wmf.wikidata_entity
  WHERE snapshot = '2020-03-02' 
) t  
""")

In [7]:
#Restrict to Wikidata Items with sitelink (pages existing on the )
directClaimsWithPages = directClaims.join(QwithPage,directClaims['subject']==QwithPage['wikidata_item'])

In [8]:
directClaimsWithPages.createTempView('directClaimsWithPages')

In [10]:
#Filter to claims to 'wikibase-entityid'. That means that we create Triples Qx Py Qz,
#excluding other kind of relations such as Qx Py 'string'

claims  = spark.sql('''SELECT subject, predicate, object
                    FROM  directClaimsWithPages WHERE  object_typ = 'wikibase-entityid'
''')

In [12]:
claims.show()

+--------+---------+--------------------+
| subject|predicate|              object|
+--------+---------+--------------------+
|Q1000211|    P1313|{"entity-type":"i...|
|Q1000211|      P17|{"entity-type":"i...|
|Q1000211|      P31|{"entity-type":"i...|
|Q1000211|     P131|{"entity-type":"i...|
|Q1000211|     P131|{"entity-type":"i...|
|Q1000211|     P131|{"entity-type":"i...|
|Q1000211|      P47|{"entity-type":"i...|
|Q1000211|      P47|{"entity-type":"i...|
|Q1000211|      P47|{"entity-type":"i...|
|Q1000211|      P47|{"entity-type":"i...|
|Q1000211|      P47|{"entity-type":"i...|
|Q1000211|      P47|{"entity-type":"i...|
|Q1000211|     P166|{"entity-type":"i...|
|Q1000919|      P31|{"entity-type":"i...|
|Q1000919|     P112|{"entity-type":"i...|
|Q1001340|      P31|{"entity-type":"i...|
|Q1001340|     P106|{"entity-type":"i...|
|Q1001340|      P20|{"entity-type":"i...|
|Q1001340|     P735|{"entity-type":"i...|
|Q1001340|      P21|{"entity-type":"i...|
+--------+---------+--------------

In [16]:
#We parse the claims, converting to python dictionary and filtering out all claims with an id
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import concat, col, lit


@udf(returnType=StringType())
def getId(obj):
    try:
        d =  eval(obj)
        return d.get('id')
    except:
        return 'Nothing'
claims = claims.withColumn('id', getId(col("object")))
claimsWithId = claims.where(claims['id'] != 'Nothing').select('subject','predicate','id')
claimsWithId = claimsWithId.withColumn('predicateToObject', concat(col("predicate"), lit(" "), col("id")))

In [17]:
# Given that we have exploded the claims (meaning that now for each claim we have a row)
# We group all claims by subject, concatenating all the claims, to create the Bag of Words that is the input
# of the wikidata-topic-model
import pyspark.sql.functions as f
allClaimsPerItem = claimsWithId.groupby('subject').agg(f.concat_ws(" ", f.collect_list(claimsWithId.predicateToObject)).alias('BoW'))

In [18]:
allClaimsPerItem.show()

+---------+--------------------+
|  subject|                 BoW|
+---------+--------------------+
| Q1000211|P1313 Q65191356 P...|
| Q1000919|P31 Q43229 P112 Q...|
| Q1001340|P31 Q5 P106 Q4260...|
| Q1001788|P1454 Q134161 P31...|
| Q1001929|P1435 Q19558910 P...|
| Q1002146|P31 Q1748957 P279...|
| Q1002610|        P31 Q4167410|
|Q10026565|P31 Q3464665 P179...|
| Q1003019|P1435 Q19558910 P...|
| Q1003138|P31 Q4830453 P159...|
| Q1003174|P361 Q1747183 P17...|
| Q1003185|P31 Q11173 P31 Q2...|
| Q1003336|P1313 Q65182521 P...|
| Q1003686|P1465 Q32665933 P...|
| Q1003747|P641 Q41466 P2348...|
| Q1004179|P611 Q846687 P31 ...|
|  Q100433|P734 Q26736379 P3...|
| Q1004468|P31 Q5 P106 Q3618...|
|  Q100453|P734 Q14947294 P3...|
|  Q100477|P641 Q2736 P54 Q4...|
+---------+--------------------+
only showing top 20 rows



In [19]:
# We save results in CSV in HDFS
allClaimsPerItem.write.csv('claimsPerWikidaItemTopicInputAllWikidataItems.csv',header=True,mode='overwrite')


In [20]:
# Because we want to use allClaimsPerItem data in a Python Kernel without access to Hadoop
# we take the data out from the cluster, and filter (to remove the repetition of CSV headers generated 
# in the by the way hadoop fs -text works)
!hadoop fs -text  claimsPerWikidaItemTopicInputAllWikidataItems.csv/* > claimsPerWikidaItemTopicInputAllWikidataItems.csv.tmp
!awk 'BEGIN{f=""}{if($0!=f){print $0}if(NR==1){f=$0}}' claimsPerWikidaItemTopicInputAllWikidataItems.csv.tmp > claimsPerWikidaItemTopicInputAllWikidataItems.csv
!rm claimsPerWikidaItemTopicInputAllWikidataItems.csv.tmp


20/04/11 00:28:28 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
