# Savoir sélectionner des données

Nous allons voir comment lire le dump de Wikidata et y sélectionner des données pour fabriquer notre propre CSV.

## Comment lire le Json du dump de Wikidata ?

On retire les lignes vides ou qui ne contiennent pas d'enregistrement et la virgule à la fin.

In [2]:
import json
#import pyspark
#sc = pyspark.SparkContext('local[*]')

wikiFile = sc.textFile('wikidata-sample.json')

#remove the [ ] in the file and the last comma
rdd = wikiFile\
        .filter(lambda x: '{' in x)\
        .map(lambda x: json.loads(x[:-1]))\
        .take(1)

formatted = json.dumps(rdd, sort_keys=True, indent=4, separators=(',', ': '))
print(formatted)

[
    {
        "aliases": {
            "ar": [
                {
                    "language": "ar",
                    "value": "\u0625\u0633\u0643\u062a\u0644\u0646\u062f\u0627"
                }
            ],
            "be-tarask": [
                {
                    "language": "be-tarask",
                    "value": "\u0428\u043a\u043e\u0446\u044b\u044f"
                }
            ],
            "ca": [
                {
                    "language": "ca",
                    "value": "Scotland"
                }
            ],
            "en": [
                {
                    "language": "en",
                    "value": "Alba"
                },
                {
                    "language": "en",
                    "value": "Scotland, United Kingdom"
                },
                {
                    "language": "en",
                    "value": "SCT"
                },
                {
                    "language": "en",
              

Avec avoir charger le Json, on peut ensuite intérroger les champs.

In [3]:
import json

wikiFile = sc.textFile('wikidata-sample.json')

extractWikidata = wikiFile \
  .filter(lambda x: '{' in x) \
  .map(lambda x: json.loads(x[:-1])) \
  .map(lambda x: x.get('id')) \
  .take(10)

print(extractWikidata)

['Q22', 'Q31', 'Q1', 'Q13', 'Q23', 'Q35', 'Q44', 'Q64', 'Q82', 'Q84']


## Comment créer un RDD (Resilient Distributed Dataset) de type KV (Key Value) ?

Il est possible de fabriquer directement des DataFrame pour ensuite faire des requêtes SQL mais il est plus intéressant de sélectionner uniquement les données au travers de RDD pour permettre l'utilisation de cluster par la suite.

Il nous faut commencer par générer la clé de nos pairs qui doit être un entier pour permettre d'utiliser la fonction join.

In [4]:
idWikidata = "Q4645654"
print(idWikidata)
print(type(idWikidata))

keyString = idWikidata[1:]
print(keyString)
print(type(keyString))

key = int(idWikidata[1:])
print(key)
print(type(key))

Q4645654
<class 'str'>
4645654
<class 'str'>
4645654
<class 'int'>


On peut maintenant générer des RDD KV. Exemple :

In [5]:
import json

wikiFile = sc.textFile('wikidata-sample.json')

rddKV = wikiFile \
  .filter(lambda x: '{' in x) \
  .map(lambda x: json.loads(x[:-1])) \
  .map(lambda x: (
                  #Calculate the Key
                   int(x.get('id')[1:]),
                  #Select Value
                  x
                  )
      )\
  .take(1)

print(rddKV)

[(22, {'type': 'item', 'id': 'Q22', 'labels': {'en-gb': {'language': 'en-gb', 'value': 'Scotland'}, 'en': {'language': 'en', 'value': 'Scotland'}, 'fr': {'language': 'fr', 'value': 'Écosse'}, 'it': {'language': 'it', 'value': 'Scozia'}, 'pl': {'language': 'pl', 'value': 'Szkocja'}, 'eo': {'language': 'eo', 'value': 'Skotlando'}, 'nb': {'language': 'nb', 'value': 'Skottland'}, 'de': {'language': 'de', 'value': 'Schottland'}, 'ru': {'language': 'ru', 'value': 'Шотландия'}, 'ja': {'language': 'ja', 'value': 'スコットランド'}, 'zh-hant': {'language': 'zh-hant', 'value': '蘇格蘭'}, 'es': {'language': 'es', 'value': 'Escocia'}, 'en-ca': {'language': 'en-ca', 'value': 'Scotland'}, 'nl': {'language': 'nl', 'value': 'Schotland'}, 'mk': {'language': 'mk', 'value': 'Шкотска'}, 'br': {'language': 'br', 'value': 'Bro-Skos'}, 'af': {'language': 'af', 'value': 'Skotland'}, 'an': {'language': 'an', 'value': 'Escocia'}, 'ang': {'language': 'ang', 'value': 'Scotland'}, 'ar': {'language': 'ar', 'value': 'اسكتلندا'

## Comment extraire une partie des données ?

Ici, nous avons fabriqué deux fonctions pour extraire le label et une propriété.

In [11]:
import json

wikiFile = sc.textFile('wikidata-sample.json')

def extractProperty(entity, prop):
    try: return entity.get('claims').get(prop)[0].get('mainsnak').get('datavalue').get('value').get('id') 
    except: return ''
    
def extractLabel(entity, lang):
    try: return entity.get('labels').get(lang).get('value')
    except: return ''

rddExtractWikidata = wikiFile \
  .filter(lambda x: '{' in x) \
  .map(lambda x: json.loads(x[:-1])) \
  .map(lambda x: ( int(x.get('id')[1:]),   
                      {
                          'id' : x.get('id'),
                          'label_en': extractLabel(x,'en') ,
                           'P734': extractProperty(x, 'P734')
                      }
                 )\
       )  
    
print(rddExtractWikidata.take(5))

[(22, {'id': 'Q22', 'label_en': 'Scotland', 'P734': ''}), (31, {'id': 'Q31', 'label_en': 'Belgium', 'P734': ''}), (1, {'id': 'Q1', 'label_en': 'universe', 'P734': ''}), (13, {'id': 'Q13', 'label_en': 'triskaidekaphobia', 'P734': ''}), (23, {'id': 'Q23', 'label_en': 'George Washington', 'P734': 'Q2550388'})]


## Comment remplacer un ID par son label dans un RDD ?

On commence par fabriquer un RDD qui contiendra uniquement les entités qui possédent la propriété qui nous intéresse afin de réduire le nombre d'entités.

On remplace la clé pour l'id que l'on cherche à résoudre.

In [12]:
def changeKey(x,newKey):
    K, V = x
    if V[newKey] == '' :
        return []
    else:        
        return [(int(V[newKey][1:]), V)]

rddP734 =  rddExtractWikidata.flatMap(lambda x: changeKey(x,'P734'))
print(rddP734.take(5))

[(2550388, {'id': 'Q23', 'label_en': 'George Washington', 'P734': 'Q2550388'}), (18375255, {'id': 'Q306', 'label_en': 'Sebastián Piñera', 'P734': 'Q18375255'}), (16871131, {'id': 'Q400', 'label_en': 'Jenna Jameson', 'P734': 'Q16871131'}), (18371366, {'id': 'Q449', 'label_en': 'Georges Brassens', 'P734': 'Q18371366'}), (18368860, {'id': 'Q464', 'label_en': 'Philipp Ludwig von Seidel', 'P734': 'Q18368860'})]


Maintenant, on fabrique un autre RDD qui n'aura que son ID et un label avec un nom d'attribut adapté.

In [13]:
rddLabel = rddExtractWikidata.mapValues(lambda v: 
                              {'P734_Label_en': v['label_en']}
                          )
print(rddLabel.take(5))

[(22, {'P734_Label_en': 'Scotland'}), (31, {'P734_Label_en': 'Belgium'}), (1, {'P734_Label_en': 'universe'}), (13, {'P734_Label_en': 'triskaidekaphobia'}), (23, {'P734_Label_en': 'George Washington'})]


On fait maintenant une jointure pour obtenir toutes les informations qui nous permettra de fabriquer une vue utilisable.

In [16]:
#And finally the join
rddJoinP734 = rddP734.join(rddLabel)
print(rddJoinP734.take(5))

[(2550388, ({'id': 'Q23', 'label_en': 'George Washington', 'P734': 'Q2550388'}, {'P734_Label_en': 'Washington'}))]


## Comment sauvegarder mes données dans un CSV ?

Pour sauvegarder un RDD, on peut passer par un DataFrame pour le faire simplement.

In [19]:
#Organize header of CSV
rddForDataFrame = rddJoinP734.map(lambda x:  
                                       Row(                                           
                                           idWikidata=x[1][0]["id"], 
                                           label_en=x[1][0]["label_en"], 
                                           P734=x[1][0]["P734"], 
                                           P734_Label_en=x[1][1]["P734_Label_en"]
                                          )
                                      )
#Create the dataframe
df = spark.createDataFrame(rddForDataFrame)

#Save the dataframe 
#in one file
df.coalesce(1).write.csv('rddForDataFrameCSV',header=True)
#df.write.csv('folderOfMyFileCSV',header=True)