In [4]:
# Install Python dependencies
!pip install edtf Wikidata

Collecting edtf
  Downloading edtf-4.0.1-py2.py3-none-any.whl (32 kB)
Collecting Wikidata
  Downloading Wikidata-0.7.0-py3-none-any.whl (29 kB)
Installing collected packages: Wikidata, edtf
Successfully installed Wikidata-0.7.0 edtf-4.0.1


In [23]:
# ACMI Wikidata importer for importing entity data to ACMI Creators

import datetime
import os

import requests
from edtf import parse_edtf, struct_time_to_date
from edtf.parser.edtf_exceptions import EDTFParseException
from wikidata.client import Client as WikidataClient

from IPython.display import display, HTML, Image


class UnknownClaimID(Exception):
    """
    This exception is raised when the claim ID for an external ID name is unknown.
    """
    pass


class Wikidata:
    """
    Imports Wikidata information for Works and Creators.
    """

    def __init__(self):
        self.wikidata_client = WikidataClient()
        self.entity = None

    def sparql_query(self, query):
        """
        Send a sparql request to Wikidata.
        """
        query_results = None
        response = requests.get(
            'https://query.wikidata.org/sparql',
            params={
                'format': 'json',
                'query': query,
            },
            timeout=120,
        )
        response.raise_for_status()
        try:
            query_results = response.json()['results']['bindings']
        except KeyError:
            pass
        return query_results

    def search(self, query):
        """
        Search Wikidata for any records it has for a query string.
        """
        search_results = None
        response = requests.get(
            'https://www.wikidata.org/w/api.php',
            params={
                'action': 'wbsearchentities',
                'format': 'json',
                'language': 'en',
                'search': query,
            },
            timeout=60,
        )
        response.raise_for_status()
        try:
            search_results = response.json()['search']
        except KeyError:
            pass
        return search_results

    def get_entity(self, wikidata_id):
        """
        Get the entity from Wikidata by ID.
        """
        self.entity = self.wikidata_client.get(wikidata_id, load=True)
        return self.entity

    def get_entity_by_tmdb_id(self, source_name, tmdb_id):
        """
        Get the entity from Wikidata by its TMDB ID.
        """
        claim_id = None
        if source_name.lower() == 'tmdb-person':
            claim_id = 'P4985'
        elif source_name.lower() == 'tmdb-movie':
            claim_id = 'P4947'
        elif source_name.lower() == 'tmdb-tv':
            claim_id = 'P4983'
        else:
            raise UnknownClaimID(f'Sorry, a TMDB claim ID for {source_name} isn\'t implemented yet.')
        query = f'PREFIX wdt: <http://www.wikidata.org/prop/direct/>\nSELECT * {{ ?item wdt:{claim_id} "{tmdb_id}" }}'
        response = self.sparql_query(query)
        wikidata_id = response[0]['item']['value'].split('/')[-1]
        return self.get_entity(wikidata_id)

    def get_wikipedia_extract(self, wikidata_id):
        """
        Get the Wikipedia extract for this Wikidata ID.
        Returns the extract and Wikipedia URL.
        """
        wikipedia_extract = None
        wikipedia_url = None
        if not self.entity:
            self.get_entity(wikidata_id)
        try:
            response = requests.get(
                'https://en.wikipedia.org/w/api.php',
                params={
                    'action': 'query',
                    'format': 'json',
                    'titles': self.entity.data['sitelinks']['enwiki']['title'],
                    'prop': 'info|extracts',
                    'exintro': True,
                    'explaintext': True,
                    'inprop': 'url',
                },
                timeout=60,
            )
            response.raise_for_status()
            wikipedia_page = next(
                iter(response.json()['query']['pages'].values()),
            )
            wikipedia_extract = wikipedia_page['extract']
            wikipedia_url = wikipedia_page['canonicalurl']
        except KeyError:
            pass
        return wikipedia_extract, wikipedia_url

    def get_data(self, wikidata_id, data_name):
        """
        Get an external ID from the Wikidata entity.
        """
        data = None
        if not self.entity:
            self.get_entity(wikidata_id)

        if data_name == 'imdb_id':
            claim_id = 'P345'
        elif data_name == 'tmdb_person':
            claim_id = 'P4985'
        elif data_name == 'tmdb_movie':
            claim_id = 'P4947'
        elif data_name == 'tmdb_tv':
            claim_id = 'P4983'
        elif data_name == 'viaf_id':
            claim_id = 'P214'
        elif data_name == 'loc_auth_id':
            claim_id = 'P244'
        elif data_name == 'worldcat_id':
            claim_id = 'P7859'
        elif data_name == 'date_of_birth':
            claim_id = 'P569'
        elif data_name == 'date_of_death':
            claim_id = 'P570'
        elif data_name == 'country_of_citizenship':
            claim_id = 'P27'
        elif data_name == 'also_known_as':
            claim_id = 'P1477'
        else:
            raise UnknownClaimID(f'Sorry, a claim ID for {data_name} isn\'t implemented yet.')

        try:
            data = self.entity.data['claims'][claim_id][0]['mainsnak']['datavalue']['value']
        except KeyError:
            pass

        return data

    def get_image_url(self, wikidata_id):
        """
        Get an Image url from the Wikidata entity.
        """
        image_url = None
        if not self.entity:
            self.get_entity(wikidata_id)
        try:
            image_url = self.entity[self.wikidata_client.get('P18')].image_url
        except KeyError:
            pass

        return image_url

    def get_image_license(self, wikidata_id):
        """
        Retrieve the license information from Wikimedia commons.
        Returns an HTML formatted string.
        """
        license_html = None
        if not self.entity:
            self.get_entity(wikidata_id)
        filename = None
        try:
            filename = self.entity[self.wikidata_client.get('P18')].attributes['title']
            response = requests.get(
                'https://commons.wikimedia.org/w/api.php',
                params={
                    'action': 'query',
                    'format': 'json',
                    'iiprop': 'extmetadata',
                    'prop': 'imageinfo',
                    'titles': filename,
                },
                timeout=60,
            )
            response.raise_for_status()
            search_results = response.json()['query']['pages']
            page_key = list(search_results.keys())[0]
            license_information = search_results[page_key]['imageinfo'][0]['extmetadata']
            artist = license_information['Artist']['value']
            wikimedia_page_url = f'https://commons.wikimedia.org/wiki/{filename}'
            license_name = license_information['LicenseShortName']['value']
            if license_name == 'Public domain':
                license_url = 'https://wikipedia.org/wiki/Wikipedia:Public_domain'
            else:
                license_url = license_information['LicenseUrl']['value']
            license_html = f'<p>{artist}/<a href="{wikimedia_page_url}">Wikimedia</a> '\
                           f'(<a href="{license_url}">{license_name}</a>)</p>'
        except KeyError:
            pass
        return license_html

    def print_all(self, wikidata_id):
        """
        Print all Wikidata data for an Entity ID.
        """
        entity = self.get_entity(wikidata_id)

        if entity:
            try:
                date_of_birth = self.wikidata_date_to_iso8601_date(
                    self.get_data(entity.id, 'date_of_birth')['time']
                )
                print(f'Date of birth: {date_of_birth}')
            except TypeError:
                pass
            try:
                date_of_death = self.wikidata_date_to_iso8601_date(
                    self.get_data(entity.id, 'date_of_death')['time']
                )
                print(f'Date of death: {date_of_death}')
            except TypeError:
                pass
            country_entity_data = self.get_data(entity.id, 'country_of_citizenship')
            if country_entity_data:
                country_entity = self.wikidata_client.get(country_entity_data['id'])
                if country_entity and country_entity.label:
                    print(f'Country of citizenship: {country_entity.label}')
            wikipedia_extract, wikipedia_page = self.get_wikipedia_extract(entity.id)
            if wikipedia_extract:
                print(f'Wikipedia extract: {wikipedia_extract}')
            if wikipedia_page:
                print(f'Wikipedia URL: {wikipedia_page}')

            # Django removes "%" symbols when saving to the database
            image_url = self.get_image_url(entity.id).replace('%', '')
            wikidata_image = None
            if image_url:
                print(f'Image URL: {image_url}')
                display(Image(url=image_url))

            credit_line = self.get_image_license(entity.id)
            if credit_line:
                display(HTML(credit_line))

            also_known_as = self.get_data(entity.id, 'also_known_as')
            if also_known_as:
                print(f"AKA: {also_known_as['text']}")

    def wikidata_date_to_iso8601_date(self, wikidata_date):
        """
        Convert Wikidata date with signed year to iso8601 date format.
        e.g. +1953-07-01T00:00:00Z to 1953-07-01
        """
        iso8601_date = None
        # Remove signed year
        if wikidata_date and wikidata_date.startswith('+'):
            wikidata_date = wikidata_date[1:]

        try:
            edtf_format = parse_edtf(wikidata_date)
            iso8601_date = str(struct_time_to_date(edtf_format.lower_strict()))
        except (EDTFParseException, ValueError):
            pass

        return iso8601_date

    def get_wikidata_entities_with_acmi_ids(self):
        """
        Get all Wikidata entities that have ACMI IDs (P7003 claims).

        Returns a dictionary of works and creators tuples.
        e.g. 'works': [(Wikidata ID, XOS Work ID)]
        """
        wikidata_entities = {}
        query = 'select ?acmi_id ?wikidata_id where { ?wikidata_id wdt:P7003 ?acmi_id }'
        responses = self.sparql_query(query)
        for item in responses:
            wikidata_id = item['wikidata_id']['value'].split('/')[-1]
            acmi_id_parts = item['acmi_id']['value'].split('/')
            if not wikidata_entities.get(acmi_id_parts[0]):
                wikidata_entities[acmi_id_parts[0]] = set()
            wikidata_entities[acmi_id_parts[0]].add((wikidata_id, int(acmi_id_parts[-1])))
        return wikidata_entities


In [24]:
# Example usage getting all information available
wikidata = Wikidata()
wikidata.print_all('Q438911')

Date of birth: 1950-12-18
Country of citizenship: Australia
Wikipedia extract: Gillian May Armstrong (born 18 December 1950) is an Australian feature film and documentary director, best known for My Brilliant Career, Little Women, The Last Days of Chez Nous, and Mrs. Soffel. She is a Member of the Order of Australia.She has won multiple awards including an AFI Best Director Award, and has been nominated for numerous other awards including a Palme D'Or  and two Golden Bear Awards. She has received multiple Honorary Doctorates including an Honorary Doctor of Letter Degree from University of Sydney, and an Honorary Doctorate from Swinburne University of Technology.
Wikipedia URL: https://en.wikipedia.org/wiki/Gillian_Armstrong
Image URL: https://upload.wikimedia.org/wikipedia/commons/1/11/Gillian_Armstrong.jpg


In [27]:
# Example searching for a person in Wikidata
wikidata = Wikidata()
wikidata.search('Tilda Swinton')

[{'id': 'Q200534',
  'title': 'Q200534',
  'pageid': 197299,
  'display': {'label': {'value': 'Tilda Swinton', 'language': 'en'},
   'description': {'value': 'Scottish-British actress', 'language': 'en'}},
  'repository': 'wikidata',
  'url': '//www.wikidata.org/wiki/Q200534',
  'concepturi': 'http://www.wikidata.org/entity/Q200534',
  'label': 'Tilda Swinton',
  'description': 'Scottish-British actress',
  'match': {'type': 'label', 'language': 'en', 'text': 'Tilda Swinton'}},
 {'id': 'Q113726485',
  'title': 'Q113726485',
  'pageid': 108483900,
  'display': {'label': {'value': 'Tilda Swinton. The Love Factory',
    'language': 'en'},
   'description': {'value': '2002 short film directed by Luca Guadagnino',
    'language': 'en'}},
  'repository': 'wikidata',
  'url': '//www.wikidata.org/wiki/Q113726485',
  'concepturi': 'http://www.wikidata.org/entity/Q113726485',
  'label': 'Tilda Swinton. The Love Factory',
  'description': '2002 short film directed by Luca Guadagnino',
  'match': 

In [29]:
# Example getting a Wikidata entity by a TMDB ID
wikidata = Wikidata()
wikidata.get_entity_by_tmdb_id('TMDB-Person', '1620')

<wikidata.entity.Entity Q214289 'Michelle Yeoh'>

In [25]:
# Example printing all of the ACMI IDs in Wikidata
wikidata = Wikidata()
acmi_links = wikidata.get_wikidata_entities_with_acmi_ids()
print(f'There are {len(acmi_links["works"])} ACMI Works, and {len(acmi_links["creators"])} ACMI Creators in Wikidata')

There are 7153 ACMI Works, and 17202 ACMI Creators in Wikidata
